Merge remote-tracking branch 'origin/master' into wip-1644-programmatic-deploy-∂π

This commit is contained in:
Roland 2012-02-03 09:49:04 +01:00
commit 45140b465e
306 changed files with 14713 additions and 11320 deletions

View file

@ -40,9 +40,9 @@ public class FaultHandlingTestBase {
//#strategy
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Action>() {
new Function<Throwable, Directive>() {
@Override
public Action apply(Throwable t) {
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
@ -78,9 +78,9 @@ public class FaultHandlingTestBase {
//#strategy2
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Action>() {
new Function<Throwable, Directive>() {
@Override
public Action apply(Throwable t) {
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {

View file

@ -12,6 +12,7 @@ import akka.actor.Props;
//#import-future
import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Await;
import akka.util.Duration;
import akka.util.Timeout;
@ -37,16 +38,16 @@ import akka.util.Duration;
import akka.actor.ActorTimeoutException;
//#import-gracefulStop
//#import-askPipeTo
//#import-askPipe
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipeTo;
import static akka.pattern.Patterns.pipe;
import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.util.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
//#import-askPipeTo
//#import-askPipe
import akka.actor.Props;
import akka.actor.UntypedActor;
@ -223,12 +224,12 @@ public class UntypedActorDocTestBase {
}
@Test
public void usePatternsAskPipeTo() {
public void usePatternsAskPipe() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class));
//#ask-pipeTo
//#ask-pipe
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
@ -236,8 +237,8 @@ public class UntypedActorDocTestBase {
futures.add(ask(actorB, "reqeest", t)); // using timeout from above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher());
final Future<Result> transformed = aggregate.map(new akka.japi.Function<Iterable<Object>, Result>() {
final Future<Result> transformed = aggregate.map(new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String s = (String) it.next();
@ -246,8 +247,8 @@ public class UntypedActorDocTestBase {
}
});
pipeTo(transformed, actorC);
//#ask-pipeTo
pipe(transformed).to(actorC);
//#ask-pipe
system.shutdown();
}

View file

@ -11,6 +11,7 @@ import java.util.List;
import java.util.Map;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.Function;
import akka.util.Duration;
import akka.util.Timeout;
@ -19,9 +20,11 @@ import akka.event.LoggingAdapter;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import static akka.japi.Util.manifest;
import static akka.actor.SupervisorStrategy.*;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipeTo;
import static akka.pattern.Patterns.pipe;
import static akka.docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
@ -115,9 +118,9 @@ public class FaultHandlingDocSample {
// Stop the CounterService child if it throws ServiceUnavailable
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
new Function<Throwable, Action>() {
new Function<Throwable, Directive>() {
@Override
public Action apply(Throwable t) {
public Directive apply(Throwable t) {
if (t instanceof ServiceUnavailable) {
return stop();
} else {
@ -142,11 +145,14 @@ public class FaultHandlingDocSample {
counterService.tell(new Increment(1), getSelf());
// Send current progress to the initial sender
pipeTo(ask(counterService, GetCurrentCount, askTimeout).map(new Function<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}), progressListener);
pipe(ask(counterService, GetCurrentCount, askTimeout)
.mapTo(manifest(CurrentCount.class))
.map(new Mapper<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}))
.to(progressListener);
} else {
unhandled(msg);
}
@ -224,9 +230,9 @@ public class FaultHandlingDocSample {
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
new Function<Throwable, Action>() {
new Function<Throwable, Directive>() {
@Override
public Action apply(Throwable t) {
public Directive apply(Throwable t) {
if (t instanceof StorageException) {
return restart();
} else {

View file

@ -44,7 +44,7 @@ public class AgentDocTest {
@Test
public void createAndClose() {
//#create
//#create
ActorSystem system = ActorSystem.create("app");
Agent<Integer> agent = new Agent<Integer>(5, system);

View file

@ -4,12 +4,10 @@
package akka.docs.future;
//#imports1
import akka.dispatch.Promise;
import akka.dispatch.*;
import akka.japi.Procedure;
import akka.japi.Procedure2;
import akka.util.Timeout;
import akka.dispatch.Await;
import akka.dispatch.Future;
//#imports1
@ -57,7 +55,6 @@ import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import static org.junit.Assert.*;
@ -110,7 +107,7 @@ public class FutureDocTestBase {
}
}, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
@ -131,7 +128,7 @@ public class FutureDocTestBase {
}
}, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
@ -153,7 +150,7 @@ public class FutureDocTestBase {
Thread.sleep(100);
Future<Integer> f2 = f1.map(new Function<String, Integer>() {
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
@ -173,7 +170,7 @@ public class FutureDocTestBase {
}
}, system.dispatcher());
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() {
Future<Integer> f2 = f1.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(final String s) {
return future(new Callable<Integer>() {
public Integer call() {
@ -204,7 +201,7 @@ public class FutureDocTestBase {
// Find the sum of the odd numbers
Future<Long> futureSum = futureListOfInts.map(
new Function<Iterable<Integer>, Long>() {
new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
@ -306,24 +303,87 @@ public class FutureDocTestBase {
//#filter
Future<Integer> future1 = Futures.successful(4, system.dispatcher());
Future<Integer> successfulFilter =
future1.filter(new Function<Integer, Boolean>() {
public Boolean apply(Integer i) { return i % 2 == 0; }
future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) { return i % 2 == 0; }
});
Future<Integer> failedFilter =
future1.filter(new Function<Integer, Boolean>() {
public Boolean apply(Integer i) { return i % 2 != 0; }
future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) { return i % 2 != 0; }
});
//When filter fails, the returned Future will be failed with a scala.MatchError
//#filter
}
public void sendToTheInternetz(String s) {
}
public void sendToIssueTracker(Throwable t) {
}
@Test public void useAndThen() {
//#and-then
Future<String> future1 = Futures.successful("value", system.dispatcher()).
andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) sendToIssueTracker(failure);
}
}).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (result != null) sendToTheInternetz(result);
}
});
//#and-then
}
@Test public void useRecover() {
//#recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recover(new Recover<Integer>() {
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) return 0;
else throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#recover
}
@Test public void useTryRecover() {
//#try-recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recoverWith(new Recover<Future<Integer>>() {
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
}, system.dispatcher());
}
else throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#try-recover
}
@Test public void useOnSuccessOnFailureAndOnComplete() {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onSuccess
future.onSuccess(new Procedure<String>() {
public void apply(String result) {
future.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if ("bar" == result) {
//Do something if it resulted in "bar"
} else {
@ -337,8 +397,8 @@ public class FutureDocTestBase {
Future<String> future =
Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure
future.onFailure( new Procedure<Throwable>() {
public void apply(Throwable failure) {
future.onFailure( new OnFailure() {
public void onFailure(Throwable failure) {
if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure
} else {
@ -351,8 +411,8 @@ public class FutureDocTestBase {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new Procedure2<Throwable, String>() {
public void apply(Throwable failure, String result) {
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
@ -370,7 +430,7 @@ public class FutureDocTestBase {
Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 =
future1.zip(future2).map(new Function<scala.Tuple2<String,String>, String>() {
future1.zip(future2).map(new Mapper<scala.Tuple2<String,String>, String>() {
public String apply(scala.Tuple2<String,String> zipped) {
return zipped._1() + " " + zipped._2();
}
@ -382,7 +442,7 @@ public class FutureDocTestBase {
}
{
//#or
//#fallback-to
Future<String> future1 =
Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher());
Future<String> future2 =
@ -390,10 +450,10 @@ public class FutureDocTestBase {
Future<String> future3 =
Futures.successful("bar", system.dispatcher());
Future<String> future4 =
future1.or(future2).or(future3); // Will have "bar" in this case
future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);
//#or
//#fallback-to
}
}

View file

@ -54,6 +54,12 @@ Loading from Configuration
To be able to load extensions from your Akka configuration you must add FQCNs of implementations of either ``ExtensionId`` or ``ExtensionIdProvider``
in the "akka.extensions" section of the config you provide to your ``ActorSystem``.
::
akka {
extensions = ["akka.docs.extension.ExtensionDocTestBase.CountExtension"]
}
Applicability
=============

View file

@ -1,5 +1,51 @@
.. _fault-tolerance-sample-java:
Diagrams of the Fault Tolerance Sample (Java)
----------------------------------------------
.. image:: ../images/faulttolerancesample-normal-flow.png
*The above diagram illustrates the normal message flow.*
**Normal flow:**
======= ==================================================================================
Step Description
======= ==================================================================================
1 The progress ``Listener`` starts the work.
2 The ``Worker`` schedules work by sending ``Do`` messages periodically to itself
3, 4, 5 When receiving ``Do`` the ``Worker`` tells the ``CounterService``
to increment the counter, three times. The ``Increment`` message is forwarded
to the ``Counter``, which updates its counter variable and sends current value
to the ``Storage``.
6, 7 The ``Worker`` asks the ``CounterService`` of current value of the counter and pipes
the result back to the ``Listener``.
======= ==================================================================================
.. image:: ../images/faulttolerancesample-failure-flow.png
*The above diagram illustrates what happens in case of storage failure.*
**Failure flow:**
=========== ==================================================================================
Step Description
=========== ==================================================================================
1 The ``Storage`` throws ``StorageException``.
2 The ``CounterService`` is supervisor of the ``Storage`` and restarts the
``Storage`` when ``StorageException`` is thrown.
3, 4, 5, 6 The ``Storage`` continues to fail and is restarted.
7 After 3 failures and restarts within 5 seconds the ``Storage`` is stopped by its
supervisor, i.e. the ``CounterService``.
8 The ``CounterService`` is also watching the ``Storage`` for termination and
receives the ``Terminated`` message when the ``Storage`` has been stopped ...
9, 10, 11 and tells the ``Counter`` that there is no ``Storage``.
12 The ``CounterService`` schedules a ``Reconnect`` message to itself.
13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ...
15, 16 and tells the the ``Counter`` to use the new ``Storage``
=========== ==================================================================================
Full Source Code of the Fault Tolerance Sample (Java)
------------------------------------------------------

View file

@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
:include: strategy
I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling actions described in :ref:`supervision`.
application of the fault handling directives described in :ref:`supervision`.
First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the
@ -71,7 +71,7 @@ in the same way as the default strategy defined above.
Test Application
----------------
The following section shows the effects of the different actions in practice,
The following section shows the effects of the different directives in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
@ -93,13 +93,13 @@ Let us create actors:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: create
The first test shall demonstrate the ``Resume`` action, so we try it out by
The first test shall demonstrate the ``Resume`` directive, so we try it out by
setting some non-initial state in the actor and have it fail:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: resume
As you can see the value 42 survives the fault handling action. Now, if we
As you can see the value 42 survives the fault handling directive. Now, if we
change the failure to a more serious ``NullPointerException``, that will no
longer be the case:
@ -113,7 +113,7 @@ terminated by the supervisor:
:include: stop
Up to now the supervisor was completely unaffected by the childs failure,
because the actions set did handle it. In case of an ``Exception``, this is not
because the directives set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure.
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
@ -123,7 +123,7 @@ The supervisor itself is supervised by the top-level actor provided by the
:class:`ActorSystem`, which has the default policy to restart in case of all
``Exception`` cases (with the notable exceptions of
``ActorInitializationException`` and ``ActorKilledException``). Since the
default action in case of a restart is to kill all children, we expected our poor
default directive in case of a restart is to kill all children, we expected our poor
child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a

View file

@ -67,7 +67,7 @@ These allow you to create 'pipelines' or 'streams' that the result will travel t
Future is a Monad
^^^^^^^^^^^^^^^^^
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Mapper`` which performs
some operation on the result of the ``Future``, and returning a new result.
The return value of the ``map`` method is another ``Future`` that will contain the new result:
@ -176,14 +176,26 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: onComplete
Ordering
--------
Since callbacks are executed in any order and potentially in parallel,
it can be tricky at the times when you need sequential ordering of operations.
But there's a solution! And it's name is ``andThen``, and it creates a new Future with
the specified callback, a Future that will have the same result as the Future it's called on,
which allows for ordering like in the following sample:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: and-then
Auxiliary methods
-----------------
``Future`` ``or`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future`
``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future`
if the first ``Future`` fails.
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: or
:include: fallback-to
You can also combine two Futures into a new ``Future`` that will hold a tuple of the two Futures successful results,
using the ``zip`` operation.
@ -197,4 +209,22 @@ Exceptions
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently.
It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught
the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``,
calling ``Await.result`` will cause it to be thrown again so it can be handled properly.
calling ``Await.result`` will cause it to be thrown again so it can be handled properly.
It is also possible to handle an ``Exception`` by returning a different result.
This is done with the ``recover`` method. For example:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: recover
In this example, if the actor replied with a ``akka.actor.Status.Failure`` containing the ``ArithmeticException``,
our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks,
so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way
it will behave as if we hadn't used the ``recover`` method.
You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``,
and is use like this:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: try-recover

View file

@ -25,7 +25,7 @@ to your ``application.conf`` file::
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteSupport"
transport = "akka.remote.netty.NettyRemoteTransport"
server {
hostname = "127.0.0.1"
port = 2552

View file

@ -323,15 +323,15 @@ Ask: Send-And-Receive-Future
The ``ask`` pattern involves actors as well as futures, hence it is offered as
a use pattern rather than a method on :class:`ActorRef`:
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipeTo
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipe
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipeTo
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipe
This example demonstrates ``ask`` together with the ``pipeTo`` pattern on
This example demonstrates ``ask`` together with the ``pipe`` pattern on
futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: ``ask`` produces
a :class:`Future`, two of which are composed into a new future using the
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipeTo`` installs
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipe`` installs
an ``onComplete``-handler on the future to effect the submission of the
aggregated :class:`Result` to another actor.