Merge pull request #167 from jboner/wip-1487-future-doc-patriknw

DOC: Update Future (Scala) Chapter. See #1487
This commit is contained in:
patriknw 2011-12-15 14:48:46 -08:00
commit 51781962ec
5 changed files with 651 additions and 364 deletions

View file

@ -0,0 +1,5 @@
package akka.docs.future
import org.scalatest.junit.JUnitSuite
class FutureDocTest extends FutureDocTestBase with JUnitSuite

View file

@ -0,0 +1,325 @@
package akka.docs.future;
//#imports1
import akka.util.Timeout;
import akka.dispatch.Await;
import akka.dispatch.Future;
//#imports1
//#imports2
import akka.util.Duration;
import akka.japi.Function;
import java.util.concurrent.Callable;
import static akka.dispatch.Futures.future;
import static java.util.concurrent.TimeUnit.SECONDS;
//#imports2
//#imports3
import static akka.dispatch.Futures.sequence;
//#imports3
//#imports4
import static akka.dispatch.Futures.traverse;
//#imports4
//#imports5
import akka.japi.Function2;
import static akka.dispatch.Futures.fold;
//#imports5
//#imports6
import static akka.dispatch.Futures.reduce;
//#imports6
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import akka.testkit.AkkaSpec;
import akka.actor.Status.Failure;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.docs.actor.MyUntypedActor;
import akka.actor.Props;
import static org.junit.Assert.*;
public class FutureDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("MySystem", AkkaSpec.testConf());
}
@After
public void tearDown() {
system.shutdown();
}
@Test
public void useBlockingFromActor() {
ActorRef actor = system.actorOf(new Props(MyActor.class));
String msg = "hello";
//#ask-blocking
Timeout timeout = system.settings().ActorTimeout();
Future<Object> future = actor.ask(msg, timeout);
String result = (String) Await.result(future, timeout.duration());
//#ask-blocking
assertEquals("HELLO", result);
}
@Test
public void useFutureEval() {
//#future-eval
Future<String> f = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, system.dispatcher());
String result = (String) Await.result(f, Duration.create(1, SECONDS));
//#future-eval
assertEquals("HelloWorld", result);
}
@Test
public void useMap() {
//#map
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
});
int result = Await.result(f2, Duration.create(1, SECONDS));
assertEquals(10, result);
//#map
}
@Test
public void useMap2() throws Exception {
//#map2
Future<String> f1 = future(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(100);
return "Hello" + "World";
}
}, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
});
//#map2
int result = Await.result(f2, Duration.create(1, SECONDS));
assertEquals(10, result);
}
@Test
public void useMap3() throws Exception {
//#map3
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, system.dispatcher());
Thread.sleep(100);
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
});
//#map3
int result = Await.result(f2, Duration.create(1, SECONDS));
assertEquals(10, result);
}
@Test
public void useFlatMap() {
//#flat-map
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, system.dispatcher());
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() {
public Future<Integer> apply(final String s) {
return future(new Callable<Integer>() {
public Integer call() {
return s.length();
}
}, system.dispatcher());
}
});
//#flat-map
int result = Await.result(f2, Duration.create(1, SECONDS));
assertEquals(10, result);
}
@Test
public void useSequence() {
Future<Integer> f1 = future(new Callable<Integer>() {
public Integer call() {
return 1;
}
}, system.dispatcher());
Future<Integer> f2 = future(new Callable<Integer>() {
public Integer call() {
return 2;
}
}, system.dispatcher());
List<Future<Integer>> source = new ArrayList<Future<Integer>>();
source.add(f1);
source.add(f2);
//#sequence
//Some source generating a sequence of Future<Integer>:s
Iterable<Future<Integer>> listOfFutureInts = source;
// now we have a Future[Iterable[Integer]]
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts, system.dispatcher());
// Find the sum of the odd numbers
Future<Long> futureSum = futureListOfInts.map(new Function<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
sum += i;
return sum;
}
});
long result = Await.result(futureSum, Duration.create(1, SECONDS));
//#sequence
assertEquals(3L, result);
}
@Test
public void useTraverse() {
//#traverse
//Just a sequence of Strings
Iterable<String> listStrings = Arrays.asList("a", "b", "c");
Future<Iterable<String>> futureResult = traverse(listStrings, new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
}, system.dispatcher());
}
}, system.dispatcher());
//Returns the sequence of strings as upper case
Iterable<String> result = Await.result(futureResult, Duration.create(1, SECONDS));
assertEquals(Arrays.asList("A", "B", "C"), result);
//#traverse
}
@Test
public void useFold() {
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "a";
}
}, system.dispatcher());
Future<String> f2 = future(new Callable<String>() {
public String call() {
return "b";
}
}, system.dispatcher());
List<Future<String>> source = new ArrayList<Future<String>>();
source.add(f1);
source.add(f2);
//#fold
//A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;
//Start value is the empty string
Future<String> resultFuture = fold("", futures, new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#fold
assertEquals("ab", result);
}
@Test
public void useReduce() {
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "a";
}
}, system.dispatcher());
Future<String> f2 = future(new Callable<String>() {
public String call() {
return "b";
}
}, system.dispatcher());
List<Future<String>> source = new ArrayList<Future<String>>();
source.add(f1);
source.add(f2);
//#reduce
//A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;
Future<String> resultFuture = reduce(futures, new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#reduce
assertEquals("ab", result);
}
public static class MyActor extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof String) {
getSender().tell(((String) message).toUpperCase());
} else if (message instanceof Integer) {
int i = ((Integer) message).intValue();
if (i < 0) {
getSender().tell(new Failure(new ArithmeticException("Negative values not supported")));
} else {
getSender().tell(i);
}
}
}
}
}

View file

@ -15,150 +15,60 @@ In Akka, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a da
Use with Actors Use with Actors
--------------- ---------------
There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg);``), which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``. There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg)``), which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``.
Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is: Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports1,ask-blocking
Future[Object] future = actorRef.ask[Object](msg); This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply. Blocking is discouraged though as it can cause performance problem.
Object result = future.get(); //Block until result is available, usually bad practice The blocking operations are located in ``Await.result`` and ``Await.ready`` to make it easy to spot where blocking occurs. Alternatives to blocking are discussed further within this documentation.
Also note that the ``Future`` returned by an ``UntypedActor`` is a ``Future<Object>`` since an ``UntypedActor`` is dynamic. That is why the cast to ``String`` is used in the above sample.
This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply. Due to the dynamic nature of Akka's ``UntypedActor``\s this result can be anything. The safest way to deal with this is to specify the result to an ``Object`` as is shown in the above example. You can also use the expected result type instead of ``Any``, but if an unexpected type were to be returned you will get a ``ClassCastException``. For more elegant ways to deal with this and to use the result without blocking, refer to `Functional Futures`_.
Use Directly Use Directly
------------ ------------
A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an ``UntypedActor``. If you find yourself creating a pool of ``UntypedActor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way: A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an ``UntypedActor``. If you find yourself creating a pool of ``UntypedActor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports2,future-eval
import akka.dispatch.Future;
import static akka.dispatch.Futures.future;
import java.util.concurrent.Callable;
Future<String> f = future(new Callable<String>() {
public String call() {
return "Hello" + "World!";
}
});
String result = f.get(); //Blocks until timeout, default timeout is set in :ref:`configuration`, otherwise 5 seconds
In the above code the block passed to ``future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). Unlike a ``Future`` that is returned from an ``UntypedActor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``UntypedActor``. In the above code the block passed to ``future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). Unlike a ``Future`` that is returned from an ``UntypedActor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``UntypedActor``.
Functional Futures Functional Futures
------------------ ------------------
A recent addition to Akka's ``Future`` is several monadic methods that are very similar to the ones used by ``Scala``'s collections. These allow you to create 'pipelines' or 'streams' that the result will travel through. Akka's ``Future`` has several monadic methods that are very similar to the ones used by ``Scala``'s collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.
Future is a Monad Future is a Monad
^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result: The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports2,map
import akka.dispatch.Future;
import static akka.dispatch.Futures.future;
import static akka.japi.Function;
import java.util.concurrent.Callable;
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
});
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
});
Integer result = f2.get();
In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second Future, f2, that will eventually contain an ``Integer``. When our original ``Future``, f1, completes, it will also apply our function and complete the second Future with it's result. When we finally ``get`` the result, it will contain the number 10. Our original Future still contains the string "HelloWorld" and is unaffected by the ``map``. In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second Future, f2, that will eventually contain an ``Integer``. When our original ``Future``, f1, completes, it will also apply our function and complete the second Future with it's result. When we finally ``get`` the result, it will contain the number 10. Our original Future still contains the string "HelloWorld" and is unaffected by the ``map``.
Something to note when using these methods: if the ``Future`` is still being processed when one of these methods are called, it will be the completing thread that actually does the work. If the ``Future`` is already complete though, it will be run in our current thread. For example: Something to note when using these methods: if the ``Future`` is still being processed when one of these methods are called, it will be the completing thread that actually does the work. If the ``Future`` is already complete though, it will be run in our current thread. For example:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: map2
import akka.dispatch.Future; The original ``Future`` will take at least 0.1 second to execute now, which means it is still being processed at the time we call ``map``. The function we provide gets stored within the ``Future`` and later executed automatically by the dispatcher when the result is ready.
import static akka.dispatch.Futures.future;
import static akka.japi.Function;
import java.util.concurrent.Callable;
Future<String> f1 = future(new Callable<String>() {
public String call() {
Thread.sleep(1000);
return "Hello" + "World";
}
});
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
});
Integer result = f2.get();
The original ``Future`` will take at least 1 second to execute now, which means it is still being processed at the time we call ``map``. The function we provide gets stored within the ``Future`` and later executed automatically by the dispatcher when the result is ready.
If we do the opposite: If we do the opposite:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: map3
import akka.dispatch.Future; Our little string has been processed long before our 0.1 second sleep has finished. Because of this, the dispatcher has moved onto other messages that need processing and can no longer calculate the length of the string for us, instead it gets calculated in the current thread just as if we weren't using a ``Future``.
import static akka.dispatch.Futures.future;
import static akka.japi.Function;
import java.util.concurrent.Callable;
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
});
Thread.sleep(1000);
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
});
Integer result = f2.get();
Our little string has been processed long before our 1 second sleep has finished. Because of this, the dispatcher has moved onto other messages that need processing and can no longer calculate the length of the string for us, instead it gets calculated in the current thread just as if we weren't using a ``Future``.
Normally this works quite well as it means there is very little overhead to running a quick function. If there is a possibility of the function taking a non-trivial amount of time to process it might be better to have this done concurrently, and for that we use ``flatMap``: Normally this works quite well as it means there is very little overhead to running a quick function. If there is a possibility of the function taking a non-trivial amount of time to process it might be better to have this done concurrently, and for that we use ``flatMap``:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: flat-map
import akka.dispatch.Future;
import static akka.dispatch.Futures.future;
import static akka.japi.Function;
import java.util.concurrent.Callable;
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
});
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() {
public Future<Integer> apply(final String s) {
return future(
new Callable<Integer>() {
public Integer call() {
return s.length();
}
});
}
});
Integer result = f2.get();
Now our second Future is executed concurrently as well. This technique can also be used to combine the results of several Futures into a single calculation, which will be better explained in the following sections. Now our second Future is executed concurrently as well. This technique can also be used to combine the results of several Futures into a single calculation, which will be better explained in the following sections.
@ -167,97 +77,30 @@ Composing Futures
It is very often desirable to be able to combine different Futures with eachother, below are some examples on how that can be done in a non-blocking fashion. It is very often desirable to be able to combine different Futures with eachother, below are some examples on how that can be done in a non-blocking fashion.
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports3,sequence
import akka.dispatch.Future;
import static akka.dispatch.Futures.sequence;
import akka.japi.Function;
import java.lang.Iterable;
Iterable<Future<Integer>> listOfFutureInts = ... //Some source generating a sequence of Future<Integer>:s
// now we have a Future[Iterable[Int]]
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts);
// Find the sum of the odd numbers
Long totalSum = futureListOfInts.map(
new Function<LinkedList<Integer>, Long>() {
public Long apply(LinkedList<Integer> ints) {
long sum = 0;
for(Integer i : ints)
sum += i;
return sum;
}
}).get();
To better explain what happened in the example, ``Future.sequence`` is taking the ``Iterable<Future<Integer>>`` and turning it into a ``Future<Iterable<Integer>>``. We can then use ``map`` to work with the ``Iterable<Integer>`` directly, and we aggregate the sum of the ``Iterable``. To better explain what happened in the example, ``Future.sequence`` is taking the ``Iterable<Future<Integer>>`` and turning it into a ``Future<Iterable<Integer>>``. We can then use ``map`` to work with the ``Iterable<Integer>`` directly, and we aggregate the sum of the ``Iterable``.
The ``traverse`` method is similar to ``sequence``, but it takes a sequence of ``A``s and applies a function from ``A`` to ``Future<B>`` and returns a ``Future<Iterable<B>>``, enabling parallel ``map`` over the sequence, if you use ``Futures.future`` to create the ``Future``. The ``traverse`` method is similar to ``sequence``, but it takes a sequence of ``A``s and applies a function from ``A`` to ``Future<B>`` and returns a ``Future<Iterable<B>>``, enabling parallel ``map`` over the sequence, if you use ``Futures.future`` to create the ``Future``.
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports4,traverse
import akka.dispatch.Future;
import static akka.dispatch.Futures.traverse;
import static akka.dispatch.Futures.future;
import java.lang.Iterable;
import akka.japi.Function;
Iterable<String> listStrings = ... //Just a sequence of Strings
Future<Iterable<String>> result = traverse(listStrings, new Function<String,Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
});
}
});
result.get(); //Returns the sequence of strings as upper case
It's as simple as that! It's as simple as that!
Then there's a method that's called ``fold`` that takes a start-value, a sequence of ``Future``:s and a function from the type of the start-value, a timeout, and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will run on the Thread of the last completing Future in the sequence. Then there's a method that's called ``fold`` that takes a start-value, a sequence of ``Future``:s and a function from the type of the start-value, a timeout, and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will run on the Thread of the last completing Future in the sequence.
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports5,fold
import akka.dispatch.Future;
import static akka.dispatch.Futures.fold;
import java.lang.Iterable;
import akka.japi.Function2;
Iterable<Future<String>> futures = ... //A sequence of Futures, in this case Strings
Future<String> result = fold("", 15000, futures, new Function2<String, String, String>() { //Start value is the empty string, timeout is 15 seconds
public String apply(String r, String t) {
return r + t; //Just concatenate
}
});
result.get(); // Will produce a String that says "testtesttesttest"(... and so on).
That's all it takes! That's all it takes!
If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be 0. In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence as the start-value, you can use ``reduce``, it works like this: If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be empty String. In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence as the start-value, you can use ``reduce``, it works like this:
.. code-block:: java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports6,reduce
import akka.dispatch.Future;
import static akka.dispatch.Futures.reduce;
import java.util.Iterable;
import akka.japi.Function2;
Iterable<Future<String>> futures = ... //A sequence of Futures, in this case Strings
Future<String> result = reduce(futures, 15000, new Function2<String, String, String>() { //Timeout is 15 seconds
public String apply(String r, String t) {
return r + t; //Just concatenate
}
});
result.get(); // Will produce a String that says "testtesttesttest"(... and so on).
Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again. Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
@ -266,4 +109,4 @@ This is just a sample of what can be done.
Exceptions Exceptions
---------- ----------
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``get`` will cause it to be thrown again so it can be handled properly. Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``Await.result`` will cause it to be thrown again so it can be handled properly.

View file

@ -0,0 +1,248 @@
package akka.docs.future
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.actor.Actor
import akka.actor.Props
import akka.actor.Status.Failure
import akka.dispatch.Future
import akka.dispatch.Await
import akka.util.duration._
import akka.dispatch.Promise
object FutureDocSpec {
class MyActor extends Actor {
def receive = {
case x: String sender ! x.toUpperCase
case x: Int if x < 0 sender ! Failure(new ArithmeticException("Negative values not supported"))
case x: Int sender ! x
}
}
case object GetNext
class OddActor extends Actor {
var n = 1
def receive = {
case GetNext
sender ! n
n += 2
}
}
}
class FutureDocSpec extends AkkaSpec {
import FutureDocSpec._
"demonstrate usage of blocking from actor" in {
val actor = system.actorOf(Props[MyActor])
val msg = "hello"
//#ask-blocking
import akka.dispatch.Await
implicit val timeout = system.settings.ActorTimeout
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]
//#ask-blocking
result must be("HELLO")
}
"demonstrate usage of mapTo" in {
val actor = system.actorOf(Props[MyActor])
val msg = "hello"
implicit val timeout = system.settings.ActorTimeout
//#map-to
import akka.dispatch.Future
val future: Future[String] = (actor ? msg).mapTo[String]
//#map-to
Await.result(future, timeout.duration) must be("HELLO")
}
"demonstrate usage of simple future eval" in {
//#future-eval
import akka.dispatch.Await
import akka.dispatch.Future
import akka.util.duration._
implicit def dispatcher = system.dispatcher
val future = Future {
"Hello" + "World"
}
val result = Await.result(future, 1 second)
//#future-eval
result must be("HelloWorld")
}
"demonstrate usage of map" in {
//#map
val f1 = Future {
"Hello" + "World"
}
val f2 = f1 map { x
x.length
}
val result = Await.result(f2, 1 second)
result must be(10)
f1.value must be(Some(Right("HelloWorld")))
//#map
}
"demonstrate wrong usage of nested map" in {
//#wrong-nested-map
val f1 = Future {
"Hello" + "World"
}
val f2 = Promise.successful(3)
val f3 = f1 map { x
f2 map { y
x.length * y
}
}
//#wrong-nested-map
Await.ready(f3, 1 second)
}
"demonstrate usage of flatMap" in {
//#flat-map
val f1 = Future {
"Hello" + "World"
}
val f2 = Promise.successful(3)
val f3 = f1 flatMap { x
f2 map { y
x.length * y
}
}
val result = Await.result(f3, 1 second)
result must be(30)
//#flat-map
}
"demonstrate usage of for comprehension" in {
//#for-comprehension
val f = for {
a Future(10 / 2) // 10 / 2 = 5
b Future(a + 1) // 5 + 1 = 6
c Future(a - 1) // 5 - 1 = 4
} yield b * c // 6 * 4 = 24
// Note that the execution of futures a, b, and c
// are not done in parallel.
val result = Await.result(f, 1 second)
result must be(24)
//#for-comprehension
}
"demonstrate wrong way of composing" in {
val actor1 = system.actorOf(Props[MyActor])
val actor2 = system.actorOf(Props[MyActor])
val actor3 = system.actorOf(Props[MyActor])
val msg1 = 1
val msg2 = 2
implicit val timeout = system.settings.ActorTimeout
import akka.dispatch.Await
//#composing-wrong
val f1 = actor1 ? msg1
val f2 = actor2 ? msg2
val a = Await.result(f1, 1 second).asInstanceOf[Int]
val b = Await.result(f2, 1 second).asInstanceOf[Int]
val f3 = actor3 ? (a + b)
val result = Await.result(f3, 1 second).asInstanceOf[Int]
//#composing-wrong
result must be(3)
}
"demonstrate composing" in {
val actor1 = system.actorOf(Props[MyActor])
val actor2 = system.actorOf(Props[MyActor])
val actor3 = system.actorOf(Props[MyActor])
val msg1 = 1
val msg2 = 2
implicit val timeout = system.settings.ActorTimeout
import akka.dispatch.Await
//#composing
val f1 = actor1 ? msg1
val f2 = actor2 ? msg2
val f3 = for {
a f1.mapTo[Int]
b f2.mapTo[Int]
c (actor3 ? (a + b)).mapTo[Int]
} yield c
val result = Await.result(f3, 1 second).asInstanceOf[Int]
//#composing
result must be(3)
}
"demonstrate usage of sequence with actors" in {
implicit val timeout = system.settings.ActorTimeout
val oddActor = system.actorOf(Props[OddActor])
//#sequence-ask
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)((oddActor ? GetNext).mapTo[Int])
// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)
// Find the sum of the odd numbers
val oddSum = Await.result(futureList.map(_.sum), 1 second).asInstanceOf[Int]
oddSum must be(10000)
//#sequence-ask
}
"demonstrate usage of sequence" in {
//#sequence
val futureList = Future.sequence((1 to 100).toList.map(x Future(x * 2 - 1)))
val oddSum = Await.result(futureList.map(_.sum), 1 second).asInstanceOf[Int]
oddSum must be(10000)
//#sequence
}
"demonstrate usage of traverse" in {
//#traverse
val futureList = Future.traverse((1 to 100).toList)(x Future(x * 2 - 1))
val oddSum = Await.result(futureList.map(_.sum), 1 second).asInstanceOf[Int]
oddSum must be(10000)
//#traverse
}
"demonstrate usage of fold" in {
//#fold
val futures = for (i 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
val futureSum = Future.fold(futures)(0)(_ + _)
Await.result(futureSum, 1 second) must be(1001000)
//#fold
}
"demonstrate usage of reduce" in {
//#reduce
val futures = for (i 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
val futureSum = Future.reduce(futures)(_ + _)
Await.result(futureSum, 1 second) must be(1001000)
//#reduce
}
"demonstrate usage of recover" in {
implicit val timeout = system.settings.ActorTimeout
val actor = system.actorOf(Props[MyActor])
val msg1 = -1
//#recover
val future = actor ? msg1 recover {
case e: ArithmeticException 0
}
//#recover
Await.result(future, 1 second) must be(0)
}
}

View file

@ -19,202 +19,114 @@ There are generally two ways of getting a reply from an ``Actor``: the first is
Using an ``Actor``\'s ``?`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is: Using an ``Actor``\'s ``?`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: ask-blocking
val future = actor ? msg This will cause the current thread to block and wait for the ``Actor`` to 'complete' the ``Future`` with it's reply. Blocking is discouraged though as it can cause performance problem.
val result = future.get() The blocking operations are located in ``Await.result`` and ``Await.ready`` to make it easy to spot where blocking occurs. Alternatives to blocking are discussed further within this documentation.
Also note that the ``Future`` returned by an ``Actor`` is a ``Future[Any]`` since an ``Actor`` is dynamic. That is why the ``asInstanceOf`` is used in the above sample.
When using non-blocking it is better to use the ``mapTo`` method to safely try to cast a ``Future`` to an expected type:
This will cause the current thread to block and wait for the ``Actor`` to 'complete' the ``Future`` with it's reply. Blocking is discouraged though as it can cause performance problem. Alternatives to blocking are discussed futher within this documentation. Also note that the ``Future`` returned by an ``Actor`` is a ``Future[Any]`` since an ``Actor`` is dynamic. To safely try to cast a ``Future`` to an expected type the ``mapTo`` method may be used: .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: map-to
.. code-block:: scala The ``mapTo`` method will return a new ``Future`` that contains the result if the cast was successful, or a ``ClassCastException`` if not. Handling ``Exception``\s will be discussed further within this documentation.
val future = actor ? msg
val result = future.mapTo[String].get()
The ``mapTo`` method will return a new ``Future`` that contains the result if the cast was successful, or a ``ClassCastException`` if not. Handling ``Exception``\s will be disccused further within this documentation.
Use Directly Use Directly
------------ ------------
A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an ``Actor``. If you find yourself creating a pool of ``Actor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way: A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an ``Actor``. If you find yourself creating a pool of ``Actor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: future-eval
import akka.dispatch.Future
val future = Future {
"Hello" + "World"
}
val result = future.get()
In the above code the block passed to ``Future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). Unlike a ``Future`` that is returned from an ``Actor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``Actor``. In the above code the block passed to ``Future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). Unlike a ``Future`` that is returned from an ``Actor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``Actor``.
Functional Futures Functional Futures
------------------ ------------------
A recent addition to Akka's ``Future`` is several monadic methods that are very similar to the ones used by Scala's collections. These allow you to create 'pipelines' or 'streams' that the result will travel through. Akka's ``Future`` has several monadic methods that are very similar to the ones used by Scala's collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.
Future is a Monad Future is a Monad
^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result: The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: map
val f1 = Future { In this example we are joining two strings together within a ``Future``. Instead of waiting for this to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second ``Future`` that will eventually contain an ``Int``. When our original ``Future`` completes, it will also apply our function and complete the second ``Future`` with it's result. When we finally get the result, it will contain the number 10. Our original ``Future`` still contains the string "HelloWorld" and is unaffected by the ``map``.
"Hello" + "World"
}
val f2 = f1 map { x =>
x.length
}
val result = f2.get()
In this example we are joining two strings together within a Future. Instead of waiting for this to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second ``Future`` that will eventually contain an ``Int``. When our original ``Future`` completes, it will also apply our function and complete the second ``Future`` with it's result. When we finally ``get`` the result, it will contain the number 10. Our original ``Future`` still contains the string "HelloWorld" and is unaffected by the ``map``.
The ``map`` method is fine if we are modifying a single ``Future``, but if 2 or more ``Future``\s are involved ``map`` will not allow you to combine them together: The ``map`` method is fine if we are modifying a single ``Future``, but if 2 or more ``Future``\s are involved ``map`` will not allow you to combine them together:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: wrong-nested-map
val f1 = Future { ``f3`` is a ``Future[Future[Int]]`` instead of the desired ``Future[Int]``. Instead, the ``flatMap`` method should be used:
"Hello" + "World"
}
val f2 = Future { .. includecode:: code/akka/docs/future/FutureDocSpec.scala
3 :include: flat-map
}
val f3 = f1 map { x =>
f2 map { y =>
x.length * y
}
}
val result = f3.get().get()
The ``get`` method had to be used twice because ``f3`` is a ``Future[Future[Int]]`` instead of the desired ``Future[Int]``. Instead, the ``flatMap`` method should be used:
.. code-block:: scala
val f1 = Future {
"Hello" + "World"
}
val f2 = Future {
3
}
val f3 = f1 flatMap { x =>
f2 map { y =>
x.length * y
}
}
val result = f3.get()
For Comprehensions For Comprehensions
^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
Since ``Future`` has a ``map`` and ``flatMap`` method it can be easily used in a 'for comprehension': Since ``Future`` has a ``map`` and ``flatMap`` method it can be easily used in a 'for comprehension':
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: for-comprehension
val f = for {
a <- Future(10 / 2) // 10 / 2 = 5
b <- Future(a + 1) // 5 + 1 = 6
c <- Future(a - 1) // 5 - 1 = 4
} yield b * c // 6 * 4 = 24
val result = f.get()
Something to keep in mind when doing this is even though it looks like parts of the above example can run in parallel, each step of the for comprehension is run sequentially. This will happen on separate threads for each step but there isn't much benefit over running the calculations all within a single ``Future``. The real benefit comes when the ``Future``\s are created first, and then combining them together. Something to keep in mind when doing this is even though it looks like parts of the above example can run in parallel, each step of the for comprehension is run sequentially. This will happen on separate threads for each step but there isn't much benefit over running the calculations all within a single ``Future``. The real benefit comes when the ``Future``\s are created first, and then combining them together.
Composing Futures Composing Futures
^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^
The example for comprehension above is an example of composing ``Future``\s. A common use case for this is combining the replies of several ``Actor``\s into a single calculation without resorting to calling ``get`` or ``await`` to block for each result. First an example of using ``get``: The example for comprehension above is an example of composing ``Future``\s. A common use case for this is combining the replies of several ``Actor``\s into a single calculation without resorting to calling ``Await.result`` or ``Await.ready`` to block for each result. First an example of using ``Await.result``:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: composing-wrong
val f1 = actor1 ? msg1 Here we wait for the results from the first 2 ``Actor``\s before sending that result to the third ``Actor``. We called ``Await.result`` 3 times, which caused our little program to block 3 times before getting our final result. Now compare that to this example:
val f2 = actor2 ? msg2
val a = f1.mapTo[Int].get() .. includecode:: code/akka/docs/future/FutureDocSpec.scala
val b = f2.mapTo[Int].get() :include: composing
val f3 = actor3 ? (a + b)
val result = f3.mapTo[String].get()
Here we wait for the results from the first 2 ``Actor``\s before sending that result to the third ``Actor``. We called ``get`` 3 times, which caused our little program to block 3 times before getting our final result. Now compare that to this example:
.. code-block:: scala
val f1 = actor1 ? msg1
val f2 = actor2 ? msg2
val f3 = for {
a <- f1.mapTo[Int]
b <- f2.mapTo[Int]
c <- (actor3 ? (a + b)).mapTo[String]
} yield c
val result = f3.get()
Here we have 2 actors processing a single message each. Once the 2 results are available (note that we don't block to get these results!), they are being added together and sent to a third ``Actor``, which replies with a string, which we assign to 'result'. Here we have 2 actors processing a single message each. Once the 2 results are available (note that we don't block to get these results!), they are being added together and sent to a third ``Actor``, which replies with a string, which we assign to 'result'.
This is fine when dealing with a known amount of Actors, but can grow unwieldy if we have more then a handful. The ``sequence`` and ``traverse`` helper methods can make it easier to handle more complex use cases. Both of these methods are ways of turning, for a subclass ``T`` of ``Traversable``, ``T[Future[A]]`` into a ``Future[T[A]]``. For example: This is fine when dealing with a known amount of Actors, but can grow unwieldy if we have more then a handful. The ``sequence`` and ``traverse`` helper methods can make it easier to handle more complex use cases. Both of these methods are ways of turning, for a subclass ``T`` of ``Traversable``, ``T[Future[A]]`` into a ``Future[T[A]]``. For example:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: sequence-ask
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)((oddActor ? GetNext).mapTo[Int])
// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)
// Find the sum of the odd numbers
val oddSum = futureList.map(_.sum).get()
To better explain what happened in the example, ``Future.sequence`` is taking the ``List[Future[Int]]`` and turning it into a ``Future[List[Int]]``. We can then use ``map`` to work with the ``List[Int]`` directly, and we find the sum of the ``List``. To better explain what happened in the example, ``Future.sequence`` is taking the ``List[Future[Int]]`` and turning it into a ``Future[List[Int]]``. We can then use ``map`` to work with the ``List[Int]`` directly, and we find the sum of the ``List``.
The ``traverse`` method is similar to ``sequence``, but it takes a ``T[A]`` and a function ``A => Future[B]`` to return a ``Future[T[B]]``, where ``T`` is again a subclass of Traversable. For example, to use ``traverse`` to sum the first 100 odd numbers: The ``traverse`` method is similar to ``sequence``, but it takes a ``T[A]`` and a function ``A => Future[B]`` to return a ``Future[T[B]]``, where ``T`` is again a subclass of Traversable. For example, to use ``traverse`` to sum the first 100 odd numbers:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: traverse
val oddSum = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1)).map(_.sum).get()
This is the same result as this example: This is the same result as this example:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: sequence
val oddSum = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))).map(_.sum).get()
But it may be faster to use ``traverse`` as it doesn't have to create an intermediate ``List[Future[Int]]``. But it may be faster to use ``traverse`` as it doesn't have to create an intermediate ``List[Future[Int]]``.
Then there's a method that's called ``fold`` that takes a start-value, a sequence of ``Future``\s and a function from the type of the start-value and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will run on the Thread of the last completing Future in the sequence. Then there's a method that's called ``fold`` that takes a start-value, a sequence of ``Future``\s and a function from the type of the start-value and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will run on the Thread of the last completing Future in the sequence.
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: fold
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
val futureSum = Future.fold(0)(futures)(_ + _)
That's all it takes! That's all it takes!
If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be 0. In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence as the start-value, you can use ``reduce``, it works like this: If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be 0. In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence as the start-value, you can use ``reduce``, it works like this:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: reduce
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallelize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
val futureSum = Future.reduce(futures)(_ + _) This is just a sample of what can be done, but to use more advanced techniques it is easier to take advantage of Scalaz.
Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
This is just a sample of what can be done, but to use more advanced techniques it is easier to take advantage of Scalaz, which Akka has support for in its akka-scalaz module.
Scalaz Scalaz
@ -229,58 +141,12 @@ complete support of programming in a functional style.
Exceptions Exceptions
---------- ----------
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``Actor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``get`` will cause it to be thrown again so it can be handled properly. Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``Actor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``Await.result`` will cause it to be thrown again so it can be handled properly.
It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``recover`` method. For example: It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``recover`` method. For example:
.. code-block:: scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: recover
val future = actor ? msg1 recover { In this example, if the actor replied with a ``akka.actor.Status.Failure`` containing the ``ArithmeticException``, our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``recover`` method.
case e: ArithmeticException => 0
}
In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``recover`` method.
Timeouts
--------
Waiting forever for a ``Future`` to be completed can be dangerous. It could cause your program to block indefinitly or produce a memory leak. ``Future`` has support for a timeout already builtin with a default of 5 seconds (taken from :ref:`configuration`). A timeout is an instance of ``akka.util.Timeout`` which contains an ``akka.util.Duration``. A ``Duration`` can be finite, which needs a length and unit type, or infinite. An infinite ``Timeout`` can be dangerous since it will never actually expire.
A different ``Timeout`` can be supplied either explicitly or implicitly when a ``Future`` is created. An implicit ``Timeout`` has the benefit of being usable by a for-comprehension as well as being picked up by any methods looking for an implicit ``Timeout``, while an explicit ``Timeout`` can be used in a more controlled manner.
Explicit ``Timeout`` example:
.. code-block:: scala
import akka.util.duration._
val future1 = Future( { runSomething }, 1 second)
val future2 = future1.map(doSomethingElse)(1500 millis)
Implicit ``Timeout`` example:
.. code-block:: scala
import akka.util.Timeout
import akka.util.duration._
implicit val longTimeout = Timeout(1 minute)
val future1 = Future { runSomething }
val future2 = future1 map doSomethingElse
An important note: when explicitly providing a ``Timeout`` it is fine to just use a ``Duration`` (like in the above explicit ``Timeout`` example). An implicit ``Duration`` will be ignored if an implicit ``Timeout`` is required. Due to this, in the above implicit example the ``Duration`` is wrapped within a ``Timeout``.
If the timeout is reached the ``Future`` becomes unusable, even if an attempt is made to complete it. It is possible to have a ``Future`` handle a timeout, if needed, with the ``onTimeout`` and ``orElse`` methods:
.. code-block:: scala
val future1 = actor ? msg onTimeout { _ =>
println("Timed out!")
}
val future2 = actor ? msg orElse "Timed out!"
Using ``onTimeout`` will cause the supplied block to be executed if the ``Future`` expires, while ``orElse`` will complete the ``Future`` with the supplied value if the ``Future`` expires.