Merge pull request #267 from jboner/wip-1722-fault-sample-java-patriknw
DOC: Java version of the fault tolerance sample. See #1722
This commit is contained in:
commit
ef80eb1ccd
7 changed files with 572 additions and 50 deletions
|
|
@ -227,8 +227,8 @@ abstract class SupervisorStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart all actors linked to the same supervisor when one fails,
|
* Restart all child actors when one fails
|
||||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
|
|
@ -270,8 +270,8 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart an actor when it fails
|
* Restart a child actor when it fails
|
||||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,452 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.docs.actor.japi;
|
||||||
|
|
||||||
|
//#all
|
||||||
|
//#imports
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import akka.actor.*;
|
||||||
|
import akka.japi.Function;
|
||||||
|
import akka.util.Duration;
|
||||||
|
import akka.util.Timeout;
|
||||||
|
import akka.event.Logging;
|
||||||
|
import akka.event.LoggingAdapter;
|
||||||
|
import com.typesafe.config.Config;
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
|
||||||
|
import static akka.actor.SupervisorStrategy.*;
|
||||||
|
import static akka.pattern.Patterns.ask;
|
||||||
|
import static akka.pattern.Patterns.pipeTo;
|
||||||
|
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterApi.*;
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.StorageApi.*;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
public class FaultHandlingDocSample {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the sample
|
||||||
|
*/
|
||||||
|
public static void main(String... args) {
|
||||||
|
Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on");
|
||||||
|
|
||||||
|
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
|
||||||
|
ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
|
||||||
|
ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
|
||||||
|
// start the work and listen on progress
|
||||||
|
// note that the listener is used as sender of the tell,
|
||||||
|
// i.e. it will receive replies from the worker
|
||||||
|
worker.tell(Start, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listens on progress from the worker and shuts down the system when enough
|
||||||
|
* work has been done.
|
||||||
|
*/
|
||||||
|
public static class Listener extends UntypedActor {
|
||||||
|
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preStart() {
|
||||||
|
// If we don't get any progress within 15 seconds then the service is unavailable
|
||||||
|
getContext().setReceiveTimeout(Duration.parse("15 seconds"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
log.debug("received message {}", msg);
|
||||||
|
if (msg instanceof CurrentCount) {
|
||||||
|
CurrentCount current = (CurrentCount) msg;
|
||||||
|
log.info("Current count for [{}] is [{}]", current.key, current.count);
|
||||||
|
if (current.count > 50) {
|
||||||
|
log.info("That's enough, shutting down");
|
||||||
|
getContext().system().shutdown();
|
||||||
|
}
|
||||||
|
} else if (msg == Actors.receiveTimeout()) {
|
||||||
|
// No progress within 15 seconds, ServiceUnavailable
|
||||||
|
log.error("Shutting down due to unavailable service");
|
||||||
|
getContext().system().shutdown();
|
||||||
|
} else {
|
||||||
|
unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface WorkerApi {
|
||||||
|
public static final Object Start = "Start";
|
||||||
|
public static final Object Do = "Do";
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker performs some work when it receives the Start message. It will
|
||||||
|
* continuously notify the sender of the Start message of current progress.
|
||||||
|
* The Worker supervise the CounterService.
|
||||||
|
*/
|
||||||
|
public static class Worker extends UntypedActor {
|
||||||
|
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
final Timeout askTimeout = new Timeout(Duration.parse("5 seconds"));
|
||||||
|
|
||||||
|
// The sender of the initial Start message will continuously be notified about progress
|
||||||
|
ActorRef progressListener;
|
||||||
|
final ActorRef counterService = getContext().actorOf(new Props(CounterService.class), "counter");
|
||||||
|
|
||||||
|
// Stop the CounterService child if it throws ServiceUnavailable
|
||||||
|
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
|
||||||
|
new Function<Throwable, Action>() {
|
||||||
|
@Override
|
||||||
|
public Action apply(Throwable t) {
|
||||||
|
if (t instanceof ServiceUnavailable) {
|
||||||
|
return stop();
|
||||||
|
} else {
|
||||||
|
return escalate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SupervisorStrategy supervisorStrategy() {
|
||||||
|
return strategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
log.debug("received message {}", msg);
|
||||||
|
if (msg.equals(Start) && progressListener == null) {
|
||||||
|
progressListener = getSender();
|
||||||
|
getContext().system().scheduler().schedule(Duration.Zero(), Duration.parse("1 second"), getSelf(), Do);
|
||||||
|
} else if (msg.equals(Do)) {
|
||||||
|
counterService.tell(new Increment(1), getSelf());
|
||||||
|
counterService.tell(new Increment(1), getSelf());
|
||||||
|
counterService.tell(new Increment(1), getSelf());
|
||||||
|
|
||||||
|
// Send current count to the initial sender
|
||||||
|
pipeTo(ask(counterService, GetCurrentCount, askTimeout), progressListener);
|
||||||
|
} else {
|
||||||
|
unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface CounterServiceApi {
|
||||||
|
|
||||||
|
public static final Object GetCurrentCount = "GetCurrentCount";
|
||||||
|
|
||||||
|
public static class CurrentCount {
|
||||||
|
public final String key;
|
||||||
|
public final long count;
|
||||||
|
|
||||||
|
public CurrentCount(String key, long count) {
|
||||||
|
this.key = key;
|
||||||
|
this.count = count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Increment {
|
||||||
|
public final long n;
|
||||||
|
|
||||||
|
public Increment(long n) {
|
||||||
|
this.n = n;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s)", getClass().getSimpleName(), n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ServiceUnavailable extends RuntimeException {
|
||||||
|
public ServiceUnavailable(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the value received in Increment message to a persistent counter.
|
||||||
|
* Replies with CurrentCount when it is asked for CurrentCount. CounterService
|
||||||
|
* supervise Storage and Counter.
|
||||||
|
*/
|
||||||
|
public static class CounterService extends UntypedActor {
|
||||||
|
|
||||||
|
// Reconnect message
|
||||||
|
static final Object Reconnect = "Reconnect";
|
||||||
|
|
||||||
|
private static class SenderMsgPair {
|
||||||
|
final ActorRef sender;
|
||||||
|
final Object msg;
|
||||||
|
|
||||||
|
SenderMsgPair(ActorRef sender, Object msg) {
|
||||||
|
this.msg = msg;
|
||||||
|
this.sender = sender;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
final String key = getSelf().path().name();
|
||||||
|
ActorRef storage;
|
||||||
|
ActorRef counter;
|
||||||
|
final List<SenderMsgPair> backlog = new ArrayList<SenderMsgPair>();
|
||||||
|
final int MAX_BACKLOG = 10000;
|
||||||
|
|
||||||
|
// Restart the storage child when StorageException is thrown.
|
||||||
|
// After 3 restarts within 5 seconds it will be stopped.
|
||||||
|
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
|
||||||
|
new Function<Throwable, Action>() {
|
||||||
|
@Override
|
||||||
|
public Action apply(Throwable t) {
|
||||||
|
if (t instanceof StorageException) {
|
||||||
|
return restart();
|
||||||
|
} else {
|
||||||
|
return escalate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SupervisorStrategy supervisorStrategy() {
|
||||||
|
return strategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preStart() {
|
||||||
|
initStorage();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The child storage is restarted in case of failure, but after 3 restarts,
|
||||||
|
* and still failing it will be stopped. Better to back-off than
|
||||||
|
* continuously failing. When it has been stopped we will schedule a
|
||||||
|
* Reconnect after a delay. Watch the child so we receive Terminated message
|
||||||
|
* when it has been terminated.
|
||||||
|
*/
|
||||||
|
void initStorage() {
|
||||||
|
storage = getContext().watch(getContext().actorOf(new Props(Storage.class), "storage"));
|
||||||
|
// Tell the counter, if any, to use the new storage
|
||||||
|
if (counter != null)
|
||||||
|
counter.tell(new UseStorage(storage), getSelf());
|
||||||
|
// We need the initial value to be able to operate
|
||||||
|
storage.tell(new Get(key), getSelf());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
log.debug("received message {}", msg);
|
||||||
|
if (msg instanceof Entry && ((Entry) msg).key.equals(key) && counter == null) {
|
||||||
|
// Reply from Storage of the initial value, now we can create the Counter
|
||||||
|
final long value = ((Entry) msg).value;
|
||||||
|
counter = getContext().actorOf(new Props().withCreator(new UntypedActorFactory() {
|
||||||
|
public Actor create() {
|
||||||
|
return new Counter(key, value);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
// Tell the counter to use current storage
|
||||||
|
counter.tell(new UseStorage(storage), getSelf());
|
||||||
|
// and send the buffered backlog to the counter
|
||||||
|
for (SenderMsgPair each : backlog) {
|
||||||
|
counter.tell(each.msg, each.sender);
|
||||||
|
}
|
||||||
|
backlog.clear();
|
||||||
|
} else if (msg instanceof Increment) {
|
||||||
|
forwardOrPlaceInBacklog(msg);
|
||||||
|
} else if (msg.equals(GetCurrentCount)) {
|
||||||
|
forwardOrPlaceInBacklog(msg);
|
||||||
|
} else if (msg instanceof Terminated) {
|
||||||
|
// After 3 restarts the storage child is stopped.
|
||||||
|
// We receive Terminated because we watch the child, see initStorage.
|
||||||
|
storage = null;
|
||||||
|
// Tell the counter that there is no storage for the moment
|
||||||
|
counter.tell(new UseStorage(null), getSelf());
|
||||||
|
// Try to re-establish storage after while
|
||||||
|
getContext().system().scheduler().scheduleOnce(Duration.parse("10 seconds"), getSelf(), Reconnect);
|
||||||
|
} else if (msg.equals(Reconnect)) {
|
||||||
|
// Re-establish storage after the scheduled delay
|
||||||
|
initStorage();
|
||||||
|
} else {
|
||||||
|
unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void forwardOrPlaceInBacklog(Object msg) {
|
||||||
|
// We need the initial value from storage before we can start delegate to the counter.
|
||||||
|
// Before that we place the messages in a backlog, to be sent to the counter when
|
||||||
|
// it is initialized.
|
||||||
|
if (counter == null) {
|
||||||
|
if (backlog.size() >= MAX_BACKLOG)
|
||||||
|
throw new ServiceUnavailable("CounterService not available, lack of initial value");
|
||||||
|
backlog.add(new SenderMsgPair(getSender(), msg));
|
||||||
|
} else {
|
||||||
|
counter.forward(msg, getContext());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface CounterApi {
|
||||||
|
public static class UseStorage {
|
||||||
|
public final ActorRef storage;
|
||||||
|
|
||||||
|
public UseStorage(ActorRef storage) {
|
||||||
|
this.storage = storage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s)", getClass().getSimpleName(), storage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The in memory count variable that will send current value to the Storage,
|
||||||
|
* if there is any storage available at the moment.
|
||||||
|
*/
|
||||||
|
public static class Counter extends UntypedActor {
|
||||||
|
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
final String key;
|
||||||
|
long count;
|
||||||
|
ActorRef storage;
|
||||||
|
|
||||||
|
public Counter(String key, long initialValue) {
|
||||||
|
this.key = key;
|
||||||
|
this.count = initialValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
log.debug("received message {}", msg);
|
||||||
|
if (msg instanceof UseStorage) {
|
||||||
|
storage = ((UseStorage) msg).storage;
|
||||||
|
storeCount();
|
||||||
|
} else if (msg instanceof Increment) {
|
||||||
|
count += ((Increment) msg).n;
|
||||||
|
storeCount();
|
||||||
|
} else if (msg.equals(GetCurrentCount)) {
|
||||||
|
getSender().tell(new CurrentCount(key, count), getSelf());
|
||||||
|
} else {
|
||||||
|
unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void storeCount() {
|
||||||
|
// Delegate dangerous work, to protect our valuable state.
|
||||||
|
// We can continue without storage.
|
||||||
|
if (storage != null) {
|
||||||
|
storage.tell(new Store(new Entry(key, count)), getSelf());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface StorageApi {
|
||||||
|
|
||||||
|
public static class Store {
|
||||||
|
public final Entry entry;
|
||||||
|
|
||||||
|
public Store(Entry entry) {
|
||||||
|
this.entry = entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s)", getClass().getSimpleName(), entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Entry {
|
||||||
|
public final String key;
|
||||||
|
public final long value;
|
||||||
|
|
||||||
|
public Entry(String key, long value) {
|
||||||
|
this.key = key;
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Get {
|
||||||
|
public final String key;
|
||||||
|
|
||||||
|
public Get(String key) {
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s)", getClass().getSimpleName(), key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class StorageException extends RuntimeException {
|
||||||
|
public StorageException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Saves key/value pairs to persistent storage when receiving Store message.
|
||||||
|
* Replies with current value when receiving Get message. Will throw
|
||||||
|
* StorageException if the underlying data store is out of order.
|
||||||
|
*/
|
||||||
|
public static class Storage extends UntypedActor {
|
||||||
|
|
||||||
|
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
final DummyDB db = DummyDB.instance;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
log.debug("received message {}", msg);
|
||||||
|
if (msg instanceof Store) {
|
||||||
|
Store store = (Store) msg;
|
||||||
|
db.save(store.entry.key, store.entry.value);
|
||||||
|
} else if (msg instanceof Get) {
|
||||||
|
Get get = (Get) msg;
|
||||||
|
Long value = db.load(get.key);
|
||||||
|
getSender().tell(new Entry(get.key, value == null ? Long.valueOf(0L) : value), getSelf());
|
||||||
|
} else {
|
||||||
|
unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#dummydb
|
||||||
|
public static class DummyDB {
|
||||||
|
public static final DummyDB instance = new DummyDB();
|
||||||
|
private final Map<String, Long> db = new HashMap<String, Long>();
|
||||||
|
|
||||||
|
private DummyDB() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void save(String key, Long value) throws StorageException {
|
||||||
|
if (11 <= value && value <= 14)
|
||||||
|
throw new StorageException("Simulated store failure " + value);
|
||||||
|
db.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized Long load(String key) throws StorageException {
|
||||||
|
return db.get(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#dummydb
|
||||||
|
}
|
||||||
|
//#all
|
||||||
7
akka-docs/java/fault-tolerance-sample.rst
Normal file
7
akka-docs/java/fault-tolerance-sample.rst
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
.. _fault-tolerance-sample-java:
|
||||||
|
|
||||||
|
Full Source Code of the Fault Tolerance Sample (Java)
|
||||||
|
------------------------------------------------------
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/japi/FaultHandlingDocSample.java#all
|
||||||
|
|
||||||
|
|
@ -12,9 +12,31 @@ children, and as such each actor defines fault handling supervisor strategy.
|
||||||
This strategy cannot be changed afterwards as it is an integral part of the
|
This strategy cannot be changed afterwards as it is an integral part of the
|
||||||
actor system’s structure.
|
actor system’s structure.
|
||||||
|
|
||||||
|
Fault Handling in Practice
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
First, let us look at a sample that illustrates one way to handle data store errors,
|
||||||
|
which is a typical source of failure in real world applications. Of course it depends
|
||||||
|
on the actual application what is possible to do when the data store is unavailable,
|
||||||
|
but in this sample we use a best effort re-connect approach.
|
||||||
|
|
||||||
|
Read the following source code. The inlined comments explain the different pieces of
|
||||||
|
the fault handling and why they are added. It is also highly recommended to run this
|
||||||
|
sample as it is easy to follow the log output to understand what is happening in runtime.
|
||||||
|
|
||||||
|
.. toctree::
|
||||||
|
|
||||||
|
fault-tolerance-sample
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/japi/FaultHandlingDocSample.java#all
|
||||||
|
:exclude: imports,messages,dummydb
|
||||||
|
|
||||||
Creating a Supervisor Strategy
|
Creating a Supervisor Strategy
|
||||||
------------------------------
|
------------------------------
|
||||||
|
|
||||||
|
The following sections explain the fault handling mechanism and alternatives
|
||||||
|
in more depth.
|
||||||
|
|
||||||
For the sake of demonstration let us consider the following strategy:
|
For the sake of demonstration let us consider the following strategy:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
|
@ -26,12 +48,28 @@ First off, it is a one-for-one strategy, meaning that each child is treated
|
||||||
separately (an all-for-one strategy works very similarly, the only difference
|
separately (an all-for-one strategy works very similarly, the only difference
|
||||||
is that any decision is applied to all children of the supervisor, not only the
|
is that any decision is applied to all children of the supervisor, not only the
|
||||||
failing one). There are limits set on the restart frequency, namely maximum 10
|
failing one). There are limits set on the restart frequency, namely maximum 10
|
||||||
restarts per minute; each of these settings could be left out, which means
|
restarts per minute. ``-1`` and ``Duration.Inf()`` means that the respective limit
|
||||||
that the respective limit does not apply, leaving the possibility to specify an
|
does not apply, leaving the possibility to specify an absolute upper limit on the
|
||||||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
restarts or to make the restarts work infinitely.
|
||||||
|
|
||||||
Practical Application
|
Default Supervisor Strategy
|
||||||
---------------------
|
---------------------------
|
||||||
|
|
||||||
|
``Escalate`` is used if the defined strategy doesn't cover the exception that was thrown.
|
||||||
|
|
||||||
|
When the supervisor strategy is not defined for an actor the following
|
||||||
|
exceptions are handled by default:
|
||||||
|
|
||||||
|
* ``ActorInitializationException`` will stop the failing child actor
|
||||||
|
* ``ActorKilledException`` will stop the failing child actor
|
||||||
|
* ``Exception`` will restart the failing child actor
|
||||||
|
* Other types of ``Throwable`` will be escalated to parent actor
|
||||||
|
|
||||||
|
If the exception escalate all the way up to the root guardian it will handle it
|
||||||
|
in the same way as the default strategy defined above.
|
||||||
|
|
||||||
|
Test Application
|
||||||
|
----------------
|
||||||
|
|
||||||
The following section shows the effects of the different actions in practice,
|
The following section shows the effects of the different actions in practice,
|
||||||
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
package akka.docs.actor
|
package akka.docs.actor
|
||||||
|
|
||||||
//#all
|
//#all
|
||||||
|
//#imports
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.actor.SupervisorStrategy._
|
import akka.actor.SupervisorStrategy._
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
|
|
@ -12,13 +13,13 @@ import akka.util.Timeout
|
||||||
import akka.event.LoggingReceive
|
import akka.event.LoggingReceive
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
//#imports
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the sample
|
* Runs the sample
|
||||||
*/
|
*/
|
||||||
object FaultHandlingDocSample extends App {
|
object FaultHandlingDocSample extends App {
|
||||||
import Worker._
|
import Worker._
|
||||||
import CounterService._
|
|
||||||
|
|
||||||
val config = ConfigFactory.parseString("""
|
val config = ConfigFactory.parseString("""
|
||||||
akka.loglevel = DEBUG
|
akka.loglevel = DEBUG
|
||||||
|
|
@ -30,35 +31,43 @@ object FaultHandlingDocSample extends App {
|
||||||
|
|
||||||
val system = ActorSystem("FaultToleranceSample", config)
|
val system = ActorSystem("FaultToleranceSample", config)
|
||||||
val worker = system.actorOf(Props[Worker], name = "worker")
|
val worker = system.actorOf(Props[Worker], name = "worker")
|
||||||
|
val listener = system.actorOf(Props[Listener], name = "listener")
|
||||||
// Create an Actor that start the work and listens to progress
|
// start the work and listen on progress
|
||||||
system.actorOf(Props(new Actor with ActorLogging {
|
// note that the listener is used as sender of the tell,
|
||||||
// If we don't get any progress within 15 seconds then the service is unavailable
|
// i.e. it will receive replies from the worker
|
||||||
context.setReceiveTimeout(15 seconds)
|
worker.tell(Start, sender = listener)
|
||||||
worker ! Start
|
|
||||||
|
|
||||||
def receive = {
|
|
||||||
case CurrentCount(key, count) ⇒
|
|
||||||
log.info("Current count for [{}] is [{}]", key, count)
|
|
||||||
if (count > 50) {
|
|
||||||
log.info("That's enough, shutting down")
|
|
||||||
system.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
case ReceiveTimeout ⇒
|
|
||||||
// No progress within 15 seconds, ServiceUnavailable
|
|
||||||
log.error("Shutting down due to unavailable service")
|
|
||||||
system.shutdown()
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listens on progress from the worker and shuts down the system when enough
|
||||||
|
* work has been done.
|
||||||
|
*/
|
||||||
|
class Listener extends Actor with ActorLogging {
|
||||||
|
import CounterService._
|
||||||
|
// If we don't get any progress within 15 seconds then the service is unavailable
|
||||||
|
context.setReceiveTimeout(15 seconds)
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case CurrentCount(key, count) ⇒
|
||||||
|
log.info("Current count for [{}] is [{}]", key, count)
|
||||||
|
if (count > 50) {
|
||||||
|
log.info("That's enough, shutting down")
|
||||||
|
context.system.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
case ReceiveTimeout ⇒
|
||||||
|
// No progress within 15 seconds, ServiceUnavailable
|
||||||
|
log.error("Shutting down due to unavailable service")
|
||||||
|
context.system.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
object Worker {
|
object Worker {
|
||||||
// Messages
|
|
||||||
case object Start
|
case object Start
|
||||||
case object Do
|
case object Do
|
||||||
}
|
}
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Worker performs some work when it receives the `Start` message.
|
* Worker performs some work when it receives the `Start` message.
|
||||||
|
|
@ -94,14 +103,16 @@ class Worker extends Actor with ActorLogging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
object CounterService {
|
object CounterService {
|
||||||
// Messages
|
|
||||||
case class Increment(n: Int)
|
case class Increment(n: Int)
|
||||||
case object GetCurrentCount
|
case object GetCurrentCount
|
||||||
case class CurrentCount(key: String, count: Long)
|
case class CurrentCount(key: String, count: Long)
|
||||||
case object Reconnect
|
|
||||||
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
|
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
|
||||||
|
|
||||||
|
private case object Reconnect
|
||||||
}
|
}
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the value received in `Increment` message to a persistent
|
* Adds the value received in `Increment` message to a persistent
|
||||||
|
|
@ -119,10 +130,10 @@ class CounterService extends Actor {
|
||||||
case _: Storage.StorageException ⇒ Restart
|
case _: Storage.StorageException ⇒ Restart
|
||||||
}
|
}
|
||||||
|
|
||||||
val key = context.self.path.name
|
val key = self.path.name
|
||||||
var storage: Option[ActorRef] = None
|
var storage: Option[ActorRef] = None
|
||||||
var counter: Option[ActorRef] = None
|
var counter: Option[ActorRef] = None
|
||||||
var backlog = IndexedSeq.empty[Any]
|
var backlog = IndexedSeq.empty[(ActorRef, Any)]
|
||||||
val MaxBacklog = 10000
|
val MaxBacklog = 10000
|
||||||
|
|
||||||
override def preStart() {
|
override def preStart() {
|
||||||
|
|
@ -152,7 +163,7 @@ class CounterService extends Actor {
|
||||||
// Tell the counter to use current storage
|
// Tell the counter to use current storage
|
||||||
c ! UseStorage(storage)
|
c ! UseStorage(storage)
|
||||||
// and send the buffered backlog to the counter
|
// and send the buffered backlog to the counter
|
||||||
backlog foreach { c ! _ }
|
for ((replyTo, msg) ← backlog) c.tell(msg, sender = replyTo)
|
||||||
backlog = IndexedSeq.empty
|
backlog = IndexedSeq.empty
|
||||||
|
|
||||||
case msg @ Increment(n) ⇒ forwardOrPlaceInBacklog(msg)
|
case msg @ Increment(n) ⇒ forwardOrPlaceInBacklog(msg)
|
||||||
|
|
@ -182,16 +193,17 @@ class CounterService extends Actor {
|
||||||
case None ⇒
|
case None ⇒
|
||||||
if (backlog.size >= MaxBacklog)
|
if (backlog.size >= MaxBacklog)
|
||||||
throw new ServiceUnavailable("CounterService not available, lack of initial value")
|
throw new ServiceUnavailable("CounterService not available, lack of initial value")
|
||||||
backlog = backlog :+ msg
|
backlog = backlog :+ (sender, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
object Counter {
|
object Counter {
|
||||||
// Messages
|
|
||||||
case class UseStorage(storage: Option[ActorRef])
|
case class UseStorage(storage: Option[ActorRef])
|
||||||
}
|
}
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The in memory count variable that will send current
|
* The in memory count variable that will send current
|
||||||
|
|
@ -228,13 +240,14 @@ class Counter(key: String, initialValue: Long) extends Actor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
object Storage {
|
object Storage {
|
||||||
// Messages
|
|
||||||
case class Store(entry: Entry)
|
case class Store(entry: Entry)
|
||||||
case class Get(key: String)
|
case class Get(key: String)
|
||||||
case class Entry(key: String, value: Long)
|
case class Entry(key: String, value: Long)
|
||||||
class StorageException(msg: String) extends RuntimeException(msg)
|
class StorageException(msg: String) extends RuntimeException(msg)
|
||||||
}
|
}
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Saves key/value pairs to persistent storage when receiving `Store` message.
|
* Saves key/value pairs to persistent storage when receiving `Store` message.
|
||||||
|
|
@ -252,9 +265,10 @@ class Storage extends Actor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#dummydb
|
||||||
object DummyDB {
|
object DummyDB {
|
||||||
import Storage.StorageException
|
import Storage.StorageException
|
||||||
var db = Map[String, Long]()
|
private var db = Map[String, Long]()
|
||||||
|
|
||||||
@throws(classOf[StorageException])
|
@throws(classOf[StorageException])
|
||||||
def save(key: String, value: Long): Unit = synchronized {
|
def save(key: String, value: Long): Unit = synchronized {
|
||||||
|
|
@ -267,4 +281,5 @@ object DummyDB {
|
||||||
db.get(key)
|
db.get(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//#dummydb
|
||||||
//#all
|
//#all
|
||||||
|
|
|
||||||
7
akka-docs/scala/fault-tolerance-sample.rst
Normal file
7
akka-docs/scala/fault-tolerance-sample.rst
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
.. _fault-tolerance-sample-scala:
|
||||||
|
|
||||||
|
Full Source Code of the Fault Tolerance Sample (Scala)
|
||||||
|
------------------------------------------------------
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingDocSample.scala#all
|
||||||
|
|
||||||
|
|
@ -24,7 +24,12 @@ Read the following source code. The inlined comments explain the different piece
|
||||||
the fault handling and why they are added. It is also highly recommended to run this
|
the fault handling and why they are added. It is also highly recommended to run this
|
||||||
sample as it is easy to follow the log output to understand what is happening in runtime.
|
sample as it is easy to follow the log output to understand what is happening in runtime.
|
||||||
|
|
||||||
|
.. toctree::
|
||||||
|
|
||||||
|
fault-tolerance-sample
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSample.scala#all
|
.. includecode:: code/akka/docs/actor/FaultHandlingDocSample.scala#all
|
||||||
|
:exclude: imports,messages,dummydb
|
||||||
|
|
||||||
Creating a Supervisor Strategy
|
Creating a Supervisor Strategy
|
||||||
------------------------------
|
------------------------------
|
||||||
|
|
@ -43,7 +48,7 @@ First off, it is a one-for-one strategy, meaning that each child is treated
|
||||||
separately (an all-for-one strategy works very similarly, the only difference
|
separately (an all-for-one strategy works very similarly, the only difference
|
||||||
is that any decision is applied to all children of the supervisor, not only the
|
is that any decision is applied to all children of the supervisor, not only the
|
||||||
failing one). There are limits set on the restart frequency, namely maximum 10
|
failing one). There are limits set on the restart frequency, namely maximum 10
|
||||||
restarts per minute; each of these settings defaults to ``None`` which means
|
restarts per minute; each of these settings could be left out, which means
|
||||||
that the respective limit does not apply, leaving the possibility to specify an
|
that the respective limit does not apply, leaving the possibility to specify an
|
||||||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||||
|
|
||||||
|
|
@ -57,14 +62,12 @@ Default Supervisor Strategy
|
||||||
``Escalate`` is used if the defined strategy doesn't cover the exception that was thrown.
|
``Escalate`` is used if the defined strategy doesn't cover the exception that was thrown.
|
||||||
|
|
||||||
When the supervisor strategy is not defined for an actor the following
|
When the supervisor strategy is not defined for an actor the following
|
||||||
exceptions are handled by default::
|
exceptions are handled by default:
|
||||||
|
|
||||||
OneForOneStrategy() {
|
* ``ActorInitializationException`` will stop the failing child actor
|
||||||
case _: ActorInitializationException ⇒ Stop
|
* ``ActorKilledException`` will stop the failing child actor
|
||||||
case _: ActorKilledException ⇒ Stop
|
* ``Exception`` will restart the failing child actor
|
||||||
case _: Exception ⇒ Restart
|
* Other types of ``Throwable`` will be escalated to parent actor
|
||||||
case _ ⇒ Escalate
|
|
||||||
}
|
|
||||||
|
|
||||||
If the exception escalate all the way up to the root guardian it will handle it
|
If the exception escalate all the way up to the root guardian it will handle it
|
||||||
in the same way as the default strategy defined above.
|
in the same way as the default strategy defined above.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue