DOC: Update Future (Java) Chapter. See #1487
This commit is contained in:
parent
3b6c3e28d3
commit
da24cb03fd
4 changed files with 360 additions and 187 deletions
5
akka-docs/java/code/akka/docs/future/FutureDocTest.scala
Normal file
5
akka-docs/java/code/akka/docs/future/FutureDocTest.scala
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
package akka.docs.future
|
||||||
|
|
||||||
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
class FutureDocTest extends FutureDocTestBase with JUnitSuite
|
||||||
325
akka-docs/java/code/akka/docs/future/FutureDocTestBase.java
Normal file
325
akka-docs/java/code/akka/docs/future/FutureDocTestBase.java
Normal file
|
|
@ -0,0 +1,325 @@
|
||||||
|
package akka.docs.future;
|
||||||
|
|
||||||
|
//#imports1
|
||||||
|
import akka.util.Timeout;
|
||||||
|
import akka.dispatch.Await;
|
||||||
|
import akka.dispatch.Future;
|
||||||
|
|
||||||
|
//#imports1
|
||||||
|
|
||||||
|
//#imports2
|
||||||
|
import akka.util.Duration;
|
||||||
|
import akka.japi.Function;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import static akka.dispatch.Futures.future;
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
|
//#imports2
|
||||||
|
|
||||||
|
//#imports3
|
||||||
|
import static akka.dispatch.Futures.sequence;
|
||||||
|
|
||||||
|
//#imports3
|
||||||
|
|
||||||
|
//#imports4
|
||||||
|
import static akka.dispatch.Futures.traverse;
|
||||||
|
|
||||||
|
//#imports4
|
||||||
|
|
||||||
|
//#imports5
|
||||||
|
import akka.japi.Function2;
|
||||||
|
import static akka.dispatch.Futures.fold;
|
||||||
|
|
||||||
|
//#imports5
|
||||||
|
|
||||||
|
//#imports6
|
||||||
|
import static akka.dispatch.Futures.reduce;
|
||||||
|
|
||||||
|
//#imports6
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import akka.testkit.AkkaSpec;
|
||||||
|
import akka.actor.Status.Failure;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.UntypedActor;
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.docs.actor.MyUntypedActor;
|
||||||
|
import akka.actor.Props;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class FutureDocTestBase {
|
||||||
|
|
||||||
|
ActorSystem system;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
system = ActorSystem.create("MySystem", AkkaSpec.testConf());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
system.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useBlockingFromActor() {
|
||||||
|
ActorRef actor = system.actorOf(new Props(MyActor.class));
|
||||||
|
String msg = "hello";
|
||||||
|
//#ask-blocking
|
||||||
|
Timeout timeout = system.settings().ActorTimeout();
|
||||||
|
Future<Object> future = actor.ask(msg, timeout);
|
||||||
|
String result = (String) Await.result(future, timeout.duration());
|
||||||
|
//#ask-blocking
|
||||||
|
assertEquals("HELLO", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useFutureEval() {
|
||||||
|
//#future-eval
|
||||||
|
Future<String> f = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "Hello" + "World";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
String result = (String) Await.result(f, Duration.create(1, SECONDS));
|
||||||
|
//#future-eval
|
||||||
|
assertEquals("HelloWorld", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useMap() {
|
||||||
|
//#map
|
||||||
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "Hello" + "World";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
|
||||||
|
public Integer apply(String s) {
|
||||||
|
return s.length();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
int result = Await.result(f2, Duration.create(1, SECONDS));
|
||||||
|
assertEquals(10, result);
|
||||||
|
//#map
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useMap2() throws Exception {
|
||||||
|
//#map2
|
||||||
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
|
public String call() throws Exception {
|
||||||
|
Thread.sleep(100);
|
||||||
|
return "Hello" + "World";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
|
||||||
|
public Integer apply(String s) {
|
||||||
|
return s.length();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
//#map2
|
||||||
|
int result = Await.result(f2, Duration.create(1, SECONDS));
|
||||||
|
assertEquals(10, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useMap3() throws Exception {
|
||||||
|
//#map3
|
||||||
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "Hello" + "World";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
|
||||||
|
public Integer apply(String s) {
|
||||||
|
return s.length();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
//#map3
|
||||||
|
int result = Await.result(f2, Duration.create(1, SECONDS));
|
||||||
|
assertEquals(10, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useFlatMap() {
|
||||||
|
//#flat-map
|
||||||
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "Hello" + "World";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() {
|
||||||
|
public Future<Integer> apply(final String s) {
|
||||||
|
return future(new Callable<Integer>() {
|
||||||
|
public Integer call() {
|
||||||
|
return s.length();
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
//#flat-map
|
||||||
|
int result = Await.result(f2, Duration.create(1, SECONDS));
|
||||||
|
assertEquals(10, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useSequence() {
|
||||||
|
Future<Integer> f1 = future(new Callable<Integer>() {
|
||||||
|
public Integer call() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
Future<Integer> f2 = future(new Callable<Integer>() {
|
||||||
|
public Integer call() {
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
List<Future<Integer>> source = new ArrayList<Future<Integer>>();
|
||||||
|
source.add(f1);
|
||||||
|
source.add(f2);
|
||||||
|
|
||||||
|
//#sequence
|
||||||
|
//Some source generating a sequence of Future<Integer>:s
|
||||||
|
Iterable<Future<Integer>> listOfFutureInts = source;
|
||||||
|
|
||||||
|
// now we have a Future[Iterable[Integer]]
|
||||||
|
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts, system.dispatcher());
|
||||||
|
|
||||||
|
// Find the sum of the odd numbers
|
||||||
|
Future<Long> futureSum = futureListOfInts.map(new Function<Iterable<Integer>, Long>() {
|
||||||
|
public Long apply(Iterable<Integer> ints) {
|
||||||
|
long sum = 0;
|
||||||
|
for (Integer i : ints)
|
||||||
|
sum += i;
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
long result = Await.result(futureSum, Duration.create(1, SECONDS));
|
||||||
|
//#sequence
|
||||||
|
assertEquals(3L, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useTraverse() {
|
||||||
|
//#traverse
|
||||||
|
//Just a sequence of Strings
|
||||||
|
Iterable<String> listStrings = Arrays.asList("a", "b", "c");
|
||||||
|
|
||||||
|
Future<Iterable<String>> futureResult = traverse(listStrings, new Function<String, Future<String>>() {
|
||||||
|
public Future<String> apply(final String r) {
|
||||||
|
return future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return r.toUpperCase();
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
//Returns the sequence of strings as upper case
|
||||||
|
Iterable<String> result = Await.result(futureResult, Duration.create(1, SECONDS));
|
||||||
|
assertEquals(Arrays.asList("A", "B", "C"), result);
|
||||||
|
//#traverse
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useFold() {
|
||||||
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "a";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
Future<String> f2 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "b";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
List<Future<String>> source = new ArrayList<Future<String>>();
|
||||||
|
source.add(f1);
|
||||||
|
source.add(f2);
|
||||||
|
//#fold
|
||||||
|
|
||||||
|
//A sequence of Futures, in this case Strings
|
||||||
|
Iterable<Future<String>> futures = source;
|
||||||
|
|
||||||
|
//Start value is the empty string
|
||||||
|
Future<String> resultFuture = fold("", futures, new Function2<String, String, String>() {
|
||||||
|
public String apply(String r, String t) {
|
||||||
|
return r + t; //Just concatenate
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
|
||||||
|
//#fold
|
||||||
|
|
||||||
|
assertEquals("ab", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useReduce() {
|
||||||
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "a";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
Future<String> f2 = future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "b";
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
List<Future<String>> source = new ArrayList<Future<String>>();
|
||||||
|
source.add(f1);
|
||||||
|
source.add(f2);
|
||||||
|
//#reduce
|
||||||
|
|
||||||
|
//A sequence of Futures, in this case Strings
|
||||||
|
Iterable<Future<String>> futures = source;
|
||||||
|
|
||||||
|
Future<String> resultFuture = reduce(futures, new Function2<String, String, String>() {
|
||||||
|
public String apply(String r, String t) {
|
||||||
|
return r + t; //Just concatenate
|
||||||
|
}
|
||||||
|
}, system.dispatcher());
|
||||||
|
|
||||||
|
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
|
||||||
|
//#reduce
|
||||||
|
|
||||||
|
assertEquals("ab", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MyActor extends UntypedActor {
|
||||||
|
public void onReceive(Object message) {
|
||||||
|
if (message instanceof String) {
|
||||||
|
getSender().tell(((String) message).toUpperCase());
|
||||||
|
} else if (message instanceof Integer) {
|
||||||
|
int i = ((Integer) message).intValue();
|
||||||
|
if (i < 0) {
|
||||||
|
getSender().tell(new Failure(new ArithmeticException("Negative values not supported")));
|
||||||
|
} else {
|
||||||
|
getSender().tell(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -11,154 +11,64 @@ Introduction
|
||||||
------------
|
------------
|
||||||
|
|
||||||
In Akka, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a data structure used to retrieve the result of some concurrent operation. This operation is usually performed by an ``Actor`` or by the ``Dispatcher`` directly. This result can be accessed synchronously (blocking) or asynchronously (non-blocking).
|
In Akka, a `Future <http://en.wikipedia.org/wiki/Futures_and_promises>`_ is a data structure used to retrieve the result of some concurrent operation. This operation is usually performed by an ``Actor`` or by the ``Dispatcher`` directly. This result can be accessed synchronously (blocking) or asynchronously (non-blocking).
|
||||||
|
Asynchronous usage is strongly recommended.
|
||||||
|
|
||||||
Use with Actors
|
Use with Actors
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg);``), which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``.
|
There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg)``), which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``.
|
||||||
|
|
||||||
Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
|
Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports1,ask-blocking
|
||||||
|
|
||||||
Future[Object] future = actorRef.ask[Object](msg);
|
This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply. Blocking is discouraged though as it can cause performance problem. Alternatives to blocking are discussed further within this documentation.
|
||||||
Object result = future.get(); //Block until result is available, usually bad practice
|
Also note that the ``Future`` returned by an ``UntypedActor`` is a ``Future<Object>`` since an ``UntypedActor`` is dynamic. That is why the cast to ``String`` is used in the above sample.
|
||||||
|
|
||||||
This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply. Due to the dynamic nature of Akka's ``UntypedActor``\s this result can be anything. The safest way to deal with this is to specify the result to an ``Object`` as is shown in the above example. You can also use the expected result type instead of ``Any``, but if an unexpected type were to be returned you will get a ``ClassCastException``. For more elegant ways to deal with this and to use the result without blocking, refer to `Functional Futures`_.
|
|
||||||
|
|
||||||
Use Directly
|
Use Directly
|
||||||
------------
|
------------
|
||||||
|
|
||||||
A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an ``UntypedActor``. If you find yourself creating a pool of ``UntypedActor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way:
|
A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an ``UntypedActor``. If you find yourself creating a pool of ``UntypedActor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports2,future-eval
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.future;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
Future<String> f = future(new Callable<String>() {
|
|
||||||
public String call() {
|
|
||||||
return "Hello" + "World!";
|
|
||||||
}
|
|
||||||
});
|
|
||||||
String result = f.get(); //Blocks until timeout, default timeout is set in :ref:`configuration`, otherwise 5 seconds
|
|
||||||
|
|
||||||
In the above code the block passed to ``future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). Unlike a ``Future`` that is returned from an ``UntypedActor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``UntypedActor``.
|
In the above code the block passed to ``future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). Unlike a ``Future`` that is returned from an ``UntypedActor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``UntypedActor``.
|
||||||
|
|
||||||
Functional Futures
|
Functional Futures
|
||||||
------------------
|
------------------
|
||||||
|
|
||||||
A recent addition to Akka's ``Future`` is several monadic methods that are very similar to the ones used by ``Scala``'s collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.
|
Akka's ``Future`` has several monadic methods that are very similar to the ones used by ``Scala``'s collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.
|
||||||
|
|
||||||
Future is a Monad
|
Future is a Monad
|
||||||
^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result:
|
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports2,map
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.future;
|
|
||||||
import static akka.japi.Function;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
Future<String> f1 = future(new Callable<String>() {
|
|
||||||
public String call() {
|
|
||||||
return "Hello" + "World";
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
|
|
||||||
public Integer apply(String s) {
|
|
||||||
return s.length();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Integer result = f2.get();
|
|
||||||
|
|
||||||
In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second Future, f2, that will eventually contain an ``Integer``. When our original ``Future``, f1, completes, it will also apply our function and complete the second Future with it's result. When we finally ``get`` the result, it will contain the number 10. Our original Future still contains the string "HelloWorld" and is unaffected by the ``map``.
|
In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second Future, f2, that will eventually contain an ``Integer``. When our original ``Future``, f1, completes, it will also apply our function and complete the second Future with it's result. When we finally ``get`` the result, it will contain the number 10. Our original Future still contains the string "HelloWorld" and is unaffected by the ``map``.
|
||||||
|
|
||||||
Something to note when using these methods: if the ``Future`` is still being processed when one of these methods are called, it will be the completing thread that actually does the work. If the ``Future`` is already complete though, it will be run in our current thread. For example:
|
Something to note when using these methods: if the ``Future`` is still being processed when one of these methods are called, it will be the completing thread that actually does the work. If the ``Future`` is already complete though, it will be run in our current thread. For example:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: map2
|
||||||
|
|
||||||
import akka.dispatch.Future;
|
The original ``Future`` will take at least 0.1 second to execute now, which means it is still being processed at the time we call ``map``. The function we provide gets stored within the ``Future`` and later executed automatically by the dispatcher when the result is ready.
|
||||||
import static akka.dispatch.Futures.future;
|
|
||||||
import static akka.japi.Function;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
Future<String> f1 = future(new Callable<String>() {
|
|
||||||
public String call() {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
return "Hello" + "World";
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
|
|
||||||
public Integer apply(String s) {
|
|
||||||
return s.length();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Integer result = f2.get();
|
|
||||||
|
|
||||||
The original ``Future`` will take at least 1 second to execute now, which means it is still being processed at the time we call ``map``. The function we provide gets stored within the ``Future`` and later executed automatically by the dispatcher when the result is ready.
|
|
||||||
|
|
||||||
If we do the opposite:
|
If we do the opposite:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: map3
|
||||||
|
|
||||||
import akka.dispatch.Future;
|
Our little string has been processed long before our 0.1 second sleep has finished. Because of this, the dispatcher has moved onto other messages that need processing and can no longer calculate the length of the string for us, instead it gets calculated in the current thread just as if we weren't using a ``Future``.
|
||||||
import static akka.dispatch.Futures.future;
|
|
||||||
import static akka.japi.Function;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
Future<String> f1 = future(new Callable<String>() {
|
|
||||||
public String call() {
|
|
||||||
return "Hello" + "World";
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
|
|
||||||
public Integer apply(String s) {
|
|
||||||
return s.length();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Integer result = f2.get();
|
|
||||||
|
|
||||||
Our little string has been processed long before our 1 second sleep has finished. Because of this, the dispatcher has moved onto other messages that need processing and can no longer calculate the length of the string for us, instead it gets calculated in the current thread just as if we weren't using a ``Future``.
|
|
||||||
|
|
||||||
Normally this works quite well as it means there is very little overhead to running a quick function. If there is a possibility of the function taking a non-trivial amount of time to process it might be better to have this done concurrently, and for that we use ``flatMap``:
|
Normally this works quite well as it means there is very little overhead to running a quick function. If there is a possibility of the function taking a non-trivial amount of time to process it might be better to have this done concurrently, and for that we use ``flatMap``:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: flat-map
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.future;
|
|
||||||
import static akka.japi.Function;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
Future<String> f1 = future(new Callable<String>() {
|
|
||||||
public String call() {
|
|
||||||
return "Hello" + "World";
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() {
|
|
||||||
public Future<Integer> apply(final String s) {
|
|
||||||
return future(
|
|
||||||
new Callable<Integer>() {
|
|
||||||
public Integer call() {
|
|
||||||
return s.length();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Integer result = f2.get();
|
|
||||||
|
|
||||||
Now our second Future is executed concurrently as well. This technique can also be used to combine the results of several Futures into a single calculation, which will be better explained in the following sections.
|
Now our second Future is executed concurrently as well. This technique can also be used to combine the results of several Futures into a single calculation, which will be better explained in the following sections.
|
||||||
|
|
||||||
|
|
@ -167,97 +77,30 @@ Composing Futures
|
||||||
|
|
||||||
It is very often desirable to be able to combine different Futures with eachother, below are some examples on how that can be done in a non-blocking fashion.
|
It is very often desirable to be able to combine different Futures with eachother, below are some examples on how that can be done in a non-blocking fashion.
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports3,sequence
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.sequence;
|
|
||||||
import akka.japi.Function;
|
|
||||||
import java.lang.Iterable;
|
|
||||||
|
|
||||||
Iterable<Future<Integer>> listOfFutureInts = ... //Some source generating a sequence of Future<Integer>:s
|
|
||||||
|
|
||||||
// now we have a Future[Iterable[Int]]
|
|
||||||
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts);
|
|
||||||
|
|
||||||
// Find the sum of the odd numbers
|
|
||||||
Long totalSum = futureListOfInts.map(
|
|
||||||
new Function<LinkedList<Integer>, Long>() {
|
|
||||||
public Long apply(LinkedList<Integer> ints) {
|
|
||||||
long sum = 0;
|
|
||||||
for(Integer i : ints)
|
|
||||||
sum += i;
|
|
||||||
return sum;
|
|
||||||
}
|
|
||||||
}).get();
|
|
||||||
|
|
||||||
To better explain what happened in the example, ``Future.sequence`` is taking the ``Iterable<Future<Integer>>`` and turning it into a ``Future<Iterable<Integer>>``. We can then use ``map`` to work with the ``Iterable<Integer>`` directly, and we aggregate the sum of the ``Iterable``.
|
To better explain what happened in the example, ``Future.sequence`` is taking the ``Iterable<Future<Integer>>`` and turning it into a ``Future<Iterable<Integer>>``. We can then use ``map`` to work with the ``Iterable<Integer>`` directly, and we aggregate the sum of the ``Iterable``.
|
||||||
|
|
||||||
The ``traverse`` method is similar to ``sequence``, but it takes a sequence of ``A``s and applies a function from ``A`` to ``Future<B>`` and returns a ``Future<Iterable<B>>``, enabling parallel ``map`` over the sequence, if you use ``Futures.future`` to create the ``Future``.
|
The ``traverse`` method is similar to ``sequence``, but it takes a sequence of ``A``s and applies a function from ``A`` to ``Future<B>`` and returns a ``Future<Iterable<B>>``, enabling parallel ``map`` over the sequence, if you use ``Futures.future`` to create the ``Future``.
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports4,traverse
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.traverse;
|
|
||||||
import static akka.dispatch.Futures.future;
|
|
||||||
import java.lang.Iterable;
|
|
||||||
import akka.japi.Function;
|
|
||||||
|
|
||||||
Iterable<String> listStrings = ... //Just a sequence of Strings
|
|
||||||
|
|
||||||
Future<Iterable<String>> result = traverse(listStrings, new Function<String,Future<String>>() {
|
|
||||||
public Future<String> apply(final String r) {
|
|
||||||
return future(new Callable<String>() {
|
|
||||||
public String call() {
|
|
||||||
return r.toUpperCase();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
result.get(); //Returns the sequence of strings as upper case
|
|
||||||
|
|
||||||
It's as simple as that!
|
It's as simple as that!
|
||||||
|
|
||||||
Then there's a method that's called ``fold`` that takes a start-value, a sequence of ``Future``:s and a function from the type of the start-value, a timeout, and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will run on the Thread of the last completing Future in the sequence.
|
Then there's a method that's called ``fold`` that takes a start-value, a sequence of ``Future``:s and a function from the type of the start-value, a timeout, and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will run on the Thread of the last completing Future in the sequence.
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports5,fold
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.fold;
|
|
||||||
import java.lang.Iterable;
|
|
||||||
import akka.japi.Function2;
|
|
||||||
|
|
||||||
Iterable<Future<String>> futures = ... //A sequence of Futures, in this case Strings
|
|
||||||
|
|
||||||
Future<String> result = fold("", 15000, futures, new Function2<String, String, String>() { //Start value is the empty string, timeout is 15 seconds
|
|
||||||
public String apply(String r, String t) {
|
|
||||||
return r + t; //Just concatenate
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
result.get(); // Will produce a String that says "testtesttesttest"(... and so on).
|
|
||||||
|
|
||||||
That's all it takes!
|
That's all it takes!
|
||||||
|
|
||||||
|
|
||||||
If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be 0. In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence as the start-value, you can use ``reduce``, it works like this:
|
If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be empty String. In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence as the start-value, you can use ``reduce``, it works like this:
|
||||||
|
|
||||||
.. code-block:: java
|
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
|
||||||
|
:include: imports6,reduce
|
||||||
import akka.dispatch.Future;
|
|
||||||
import static akka.dispatch.Futures.reduce;
|
|
||||||
import java.util.Iterable;
|
|
||||||
import akka.japi.Function2;
|
|
||||||
|
|
||||||
Iterable<Future<String>> futures = ... //A sequence of Futures, in this case Strings
|
|
||||||
|
|
||||||
Future<String> result = reduce(futures, 15000, new Function2<String, String, String>() { //Timeout is 15 seconds
|
|
||||||
public String apply(String r, String t) {
|
|
||||||
return r + t; //Just concatenate
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
result.get(); // Will produce a String that says "testtesttesttest"(... and so on).
|
|
||||||
|
|
||||||
Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
|
Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
|
||||||
|
|
||||||
|
|
@ -266,4 +109,4 @@ This is just a sample of what can be done.
|
||||||
Exceptions
|
Exceptions
|
||||||
----------
|
----------
|
||||||
|
|
||||||
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``get`` will cause it to be thrown again so it can be handled properly.
|
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``Await.result`` will cause it to be thrown again so it can be handled properly.
|
||||||
|
|
@ -55,7 +55,7 @@ The first method for working with ``Future`` functionally is ``map``. This metho
|
||||||
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
|
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
|
||||||
:include: map
|
:include: map
|
||||||
|
|
||||||
In this example we are joining two strings together within a Future. Instead of waiting for this to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second ``Future`` that will eventually contain an ``Int``. When our original ``Future`` completes, it will also apply our function and complete the second ``Future`` with it's result. When we finally get the result, it will contain the number 10. Our original ``Future`` still contains the string "HelloWorld" and is unaffected by the ``map``.
|
In this example we are joining two strings together within a ``Future``. Instead of waiting for this to complete, we apply our function that calculates the length of the string using the ``map`` method. Now we have a second ``Future`` that will eventually contain an ``Int``. When our original ``Future`` completes, it will also apply our function and complete the second ``Future`` with it's result. When we finally get the result, it will contain the number 10. Our original ``Future`` still contains the string "HelloWorld" and is unaffected by the ``map``.
|
||||||
|
|
||||||
The ``map`` method is fine if we are modifying a single ``Future``, but if 2 or more ``Future``\s are involved ``map`` will not allow you to combine them together:
|
The ``map`` method is fine if we are modifying a single ``Future``, but if 2 or more ``Future``\s are involved ``map`` will not allow you to combine them together:
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue