Rearanged declaration of messages. Improvements from review. See #1722
This commit is contained in:
parent
4e0bd947be
commit
2aa72993d6
3 changed files with 135 additions and 106 deletions
|
|
@ -228,8 +228,8 @@ abstract class SupervisorStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart all actors linked to the same supervisor when one fails,
|
* Restart all child actors when one fails
|
||||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
|
|
@ -271,8 +271,8 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart an actor when it fails
|
* Restart a child actor when it fails
|
||||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, -1 means no limit
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,11 @@ import static akka.actor.SupervisorStrategy.*;
|
||||||
import static akka.pattern.Patterns.ask;
|
import static akka.pattern.Patterns.ask;
|
||||||
import static akka.pattern.Patterns.pipeTo;
|
import static akka.pattern.Patterns.pipeTo;
|
||||||
|
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterApi.*;
|
||||||
|
import static akka.docs.actor.japi.FaultHandlingDocSample.StorageApi.*;
|
||||||
|
|
||||||
//#imports
|
//#imports
|
||||||
|
|
||||||
public class FaultHandlingDocSample {
|
public class FaultHandlingDocSample {
|
||||||
|
|
@ -31,16 +36,14 @@ public class FaultHandlingDocSample {
|
||||||
* Runs the sample
|
* Runs the sample
|
||||||
*/
|
*/
|
||||||
public static void main(String... args) {
|
public static void main(String... args) {
|
||||||
Config config = ConfigFactory.parseString(
|
Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on");
|
||||||
"akka.loglevel = DEBUG \n" +
|
|
||||||
"akka.actor.debug { \n" +
|
|
||||||
" lifecycle = on \n" +
|
|
||||||
"}");
|
|
||||||
|
|
||||||
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
|
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
|
||||||
ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
|
ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
|
||||||
ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
|
ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
|
||||||
// start the work and listen on progress
|
// start the work and listen on progress
|
||||||
|
// note that the listener is used as sender of the tell,
|
||||||
|
// i.e. it will receive replies from the worker
|
||||||
worker.tell(Start, listener);
|
worker.tell(Start, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,6 +79,14 @@ public class FaultHandlingDocSample {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface WorkerApi {
|
||||||
|
public static final Object Start = "Start";
|
||||||
|
public static final Object Do = "Do";
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Worker performs some work when it receives the Start message. It will
|
* Worker performs some work when it receives the Start message. It will
|
||||||
* continuously notify the sender of the Start message of current progress.
|
* continuously notify the sender of the Start message of current progress.
|
||||||
|
|
@ -125,6 +136,47 @@ public class FaultHandlingDocSample {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface CounterServiceApi {
|
||||||
|
|
||||||
|
public static final Object GetCurrentCount = "GetCurrentCount";
|
||||||
|
|
||||||
|
public static class CurrentCount {
|
||||||
|
public final String key;
|
||||||
|
public final long count;
|
||||||
|
|
||||||
|
public CurrentCount(String key, long count) {
|
||||||
|
this.key = key;
|
||||||
|
this.count = count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Increment {
|
||||||
|
public final long n;
|
||||||
|
|
||||||
|
public Increment(long n) {
|
||||||
|
this.n = n;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s)", getClass().getSimpleName(), n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ServiceUnavailable extends RuntimeException {
|
||||||
|
public ServiceUnavailable(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the value received in Increment message to a persistent counter.
|
* Adds the value received in Increment message to a persistent counter.
|
||||||
* Replies with CurrentCount when it is asked for CurrentCount. CounterService
|
* Replies with CurrentCount when it is asked for CurrentCount. CounterService
|
||||||
|
|
@ -132,6 +184,9 @@ public class FaultHandlingDocSample {
|
||||||
*/
|
*/
|
||||||
public static class CounterService extends UntypedActor {
|
public static class CounterService extends UntypedActor {
|
||||||
|
|
||||||
|
// Reconnect message
|
||||||
|
static final Object Reconnect = "Reconnect";
|
||||||
|
|
||||||
private static class SenderMsgPair {
|
private static class SenderMsgPair {
|
||||||
final ActorRef sender;
|
final ActorRef sender;
|
||||||
final Object msg;
|
final Object msg;
|
||||||
|
|
@ -241,6 +296,23 @@ public class FaultHandlingDocSample {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
public interface CounterApi {
|
||||||
|
public static class UseStorage {
|
||||||
|
public final ActorRef storage;
|
||||||
|
|
||||||
|
public UseStorage(ActorRef storage) {
|
||||||
|
this.storage = storage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s)", getClass().getSimpleName(), storage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The in memory count variable that will send current value to the Storage,
|
* The in memory count variable that will send current value to the Storage,
|
||||||
* if there is any storage available at the moment.
|
* if there is any storage available at the moment.
|
||||||
|
|
@ -281,87 +353,8 @@ public class FaultHandlingDocSample {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Saves key/value pairs to persistent storage when receiving Store message.
|
|
||||||
* Replies with current value when receiving Get message. Will throw
|
|
||||||
* StorageException if the underlying data store is out of order.
|
|
||||||
*/
|
|
||||||
public static class Storage extends UntypedActor {
|
|
||||||
|
|
||||||
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
|
||||||
final DummyDB db = DummyDB.instance;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onReceive(Object msg) {
|
|
||||||
log.debug("received message {}", msg);
|
|
||||||
if (msg instanceof Store) {
|
|
||||||
Store store = (Store) msg;
|
|
||||||
db.save(store.entry.key, store.entry.value);
|
|
||||||
} else if (msg instanceof Get) {
|
|
||||||
Get get = (Get) msg;
|
|
||||||
Long value = db.load(get.key);
|
|
||||||
getSender().tell(new Entry(get.key, value == null ? Long.valueOf(0L) : value), getSelf());
|
|
||||||
} else {
|
|
||||||
unhandled(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//#messages
|
//#messages
|
||||||
public static final Object Start = "Start";
|
public interface StorageApi {
|
||||||
public static final Object Do = "Do";
|
|
||||||
public static final Object GetCurrentCount = "GetCurrentCount";
|
|
||||||
public static final Object Reconnect = "Reconnect";
|
|
||||||
|
|
||||||
public static class CurrentCount {
|
|
||||||
public final String key;
|
|
||||||
public final long count;
|
|
||||||
|
|
||||||
public CurrentCount(String key, long count) {
|
|
||||||
this.key = key;
|
|
||||||
this.count = count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Increment {
|
|
||||||
public final long n;
|
|
||||||
|
|
||||||
public Increment(long n) {
|
|
||||||
this.n = n;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return String.format("%s(%s)", getClass().getSimpleName(), n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class UseStorage {
|
|
||||||
public final ActorRef storage;
|
|
||||||
|
|
||||||
public UseStorage(ActorRef storage) {
|
|
||||||
this.storage = storage;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return String.format("%s(%s)", getClass().getSimpleName(), storage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ServiceUnavailable extends RuntimeException {
|
|
||||||
public ServiceUnavailable(String msg) {
|
|
||||||
super(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class StorageException extends RuntimeException {
|
|
||||||
public StorageException(String msg) {
|
|
||||||
super(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Store {
|
public static class Store {
|
||||||
public final Entry entry;
|
public final Entry entry;
|
||||||
|
|
@ -401,8 +394,41 @@ public class FaultHandlingDocSample {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class StorageException extends RuntimeException {
|
||||||
|
public StorageException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//#messages
|
//#messages
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Saves key/value pairs to persistent storage when receiving Store message.
|
||||||
|
* Replies with current value when receiving Get message. Will throw
|
||||||
|
* StorageException if the underlying data store is out of order.
|
||||||
|
*/
|
||||||
|
public static class Storage extends UntypedActor {
|
||||||
|
|
||||||
|
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
final DummyDB db = DummyDB.instance;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
log.debug("received message {}", msg);
|
||||||
|
if (msg instanceof Store) {
|
||||||
|
Store store = (Store) msg;
|
||||||
|
db.save(store.entry.key, store.entry.value);
|
||||||
|
} else if (msg instanceof Get) {
|
||||||
|
Get get = (Get) msg;
|
||||||
|
Long value = db.load(get.key);
|
||||||
|
getSender().tell(new Entry(get.key, value == null ? Long.valueOf(0L) : value), getSelf());
|
||||||
|
} else {
|
||||||
|
unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//#dummydb
|
//#dummydb
|
||||||
public static class DummyDB {
|
public static class DummyDB {
|
||||||
public static final DummyDB instance = new DummyDB();
|
public static final DummyDB instance = new DummyDB();
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,8 @@ object FaultHandlingDocSample extends App {
|
||||||
val worker = system.actorOf(Props[Worker], name = "worker")
|
val worker = system.actorOf(Props[Worker], name = "worker")
|
||||||
val listener = system.actorOf(Props[Listener], name = "listener")
|
val listener = system.actorOf(Props[Listener], name = "listener")
|
||||||
// start the work and listen on progress
|
// start the work and listen on progress
|
||||||
|
// note that the listener is used as sender of the tell,
|
||||||
|
// i.e. it will receive replies from the worker
|
||||||
worker.tell(Start, sender = listener)
|
worker.tell(Start, sender = listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -107,8 +109,9 @@ object CounterService {
|
||||||
case class Increment(n: Int)
|
case class Increment(n: Int)
|
||||||
case object GetCurrentCount
|
case object GetCurrentCount
|
||||||
case class CurrentCount(key: String, count: Long)
|
case class CurrentCount(key: String, count: Long)
|
||||||
case object Reconnect
|
|
||||||
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
|
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
|
||||||
|
|
||||||
|
private case object Reconnect
|
||||||
}
|
}
|
||||||
//#messages
|
//#messages
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue