diff --git a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java index 8665f3777b..265f005059 100644 --- a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java @@ -35,7 +35,7 @@ public class FaultHandlingDocSample { /** * Runs the sample */ - public static void main(String... args) { + public static void main(String[] args) { Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on"); ActorSystem system = ActorSystem.create("FaultToleranceSample", config); @@ -62,11 +62,11 @@ public class FaultHandlingDocSample { public void onReceive(Object msg) { log.debug("received message {}", msg); - if (msg instanceof CurrentCount) { - CurrentCount current = (CurrentCount) msg; - log.info("Current count for [{}] is [{}]", current.key, current.count); - if (current.count > 50) { - log.info("That's enough, shutting down"); + if (msg instanceof Progress) { + Progress progress = (Progress) msg; + log.info("Current progress: {} %", progress.percent); + if (progress.percent >= 100.0) { + log.info("That's all, shutting down"); getContext().system().shutdown(); } } else if (msg == Actors.receiveTimeout()) { @@ -83,13 +83,25 @@ public class FaultHandlingDocSample { public interface WorkerApi { public static final Object Start = "Start"; public static final Object Do = "Do"; + + public static class Progress { + public final double percent; + + public Progress(double percent) { + this.percent = percent; + } + + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), percent); + } + } } //#messages /** * Worker performs some work when it receives the Start message. It will - * continuously notify the sender of the Start message of current progress. + * continuously notify the sender of the Start message of current Progress. * The Worker supervise the CounterService. */ public static class Worker extends UntypedActor { @@ -99,6 +111,7 @@ public class FaultHandlingDocSample { // The sender of the initial Start message will continuously be notified about progress ActorRef progressListener; final ActorRef counterService = getContext().actorOf(new Props(CounterService.class), "counter"); + final int totalCount = 51; // Stop the CounterService child if it throws ServiceUnavailable private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(), @@ -128,8 +141,12 @@ public class FaultHandlingDocSample { counterService.tell(new Increment(1), getSelf()); counterService.tell(new Increment(1), getSelf()); - // Send current count to the initial sender - pipeTo(ask(counterService, GetCurrentCount, askTimeout), progressListener); + // Send current progress to the initial sender + pipeTo(ask(counterService, GetCurrentCount, askTimeout).map(new Function() { + public Progress apply(CurrentCount c) { + return new Progress(100.0 * c.count / totalCount); + } + }), progressListener); } else { unhandled(msg); } diff --git a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala index 99beab0cc3..fbdf3e25b9 100644 --- a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala +++ b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala @@ -43,15 +43,15 @@ object FaultHandlingDocSample extends App { * work has been done. */ class Listener extends Actor with ActorLogging { - import CounterService._ + import Worker._ // If we don't get any progress within 15 seconds then the service is unavailable context.setReceiveTimeout(15 seconds) def receive = { - case CurrentCount(key, count) ⇒ - log.info("Current count for [{}] is [{}]", key, count) - if (count > 50) { - log.info("That's enough, shutting down") + case Progress(percent) ⇒ + log.info("Current progress: {} %", percent) + if (percent >= 100.0) { + log.info("That's all, shutting down") context.system.shutdown() } @@ -66,13 +66,14 @@ class Listener extends Actor with ActorLogging { object Worker { case object Start case object Do + case class Progress(percent: Double) } //#messages /** * Worker performs some work when it receives the `Start` message. * It will continuously notify the sender of the `Start` message - * of current progress. The `Worker` supervise the `CounterService`. + * of current ``Progress``. The `Worker` supervise the `CounterService`. */ class Worker extends Actor with ActorLogging { import Worker._ @@ -87,6 +88,7 @@ class Worker extends Actor with ActorLogging { // The sender of the initial Start message will continuously be notified about progress var progressListener: Option[ActorRef] = None val counterService = context.actorOf(Props[CounterService], name = "counter") + val totalCount = 51 def receive = LoggingReceive { case Start if progressListener.isEmpty ⇒ @@ -98,8 +100,10 @@ class Worker extends Actor with ActorLogging { counterService ! Increment(1) counterService ! Increment(1) - // Send current count to the initial sender - counterService ? GetCurrentCount pipeTo progressListener.get + // Send current progress to the initial sender + counterService ? GetCurrentCount map { + case CurrentCount(_, count) ⇒ Progress(100.0 * count / totalCount) + } pipeTo progressListener.get } }