DOC: Better use of pipeTo in FaultHandlingDocSample

This commit is contained in:
Patrik Nordwall 2012-01-30 11:56:09 +01:00
parent ebec797322
commit c1178c9b1a
2 changed files with 38 additions and 17 deletions

View file

@ -35,7 +35,7 @@ public class FaultHandlingDocSample {
/**
* Runs the sample
*/
public static void main(String... args) {
public static void main(String[] args) {
Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on");
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
@ -62,11 +62,11 @@ public class FaultHandlingDocSample {
public void onReceive(Object msg) {
log.debug("received message {}", msg);
if (msg instanceof CurrentCount) {
CurrentCount current = (CurrentCount) msg;
log.info("Current count for [{}] is [{}]", current.key, current.count);
if (current.count > 50) {
log.info("That's enough, shutting down");
if (msg instanceof Progress) {
Progress progress = (Progress) msg;
log.info("Current progress: {} %", progress.percent);
if (progress.percent >= 100.0) {
log.info("That's all, shutting down");
getContext().system().shutdown();
}
} else if (msg == Actors.receiveTimeout()) {
@ -83,13 +83,25 @@ public class FaultHandlingDocSample {
public interface WorkerApi {
public static final Object Start = "Start";
public static final Object Do = "Do";
public static class Progress {
public final double percent;
public Progress(double percent) {
this.percent = percent;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), percent);
}
}
}
//#messages
/**
* Worker performs some work when it receives the Start message. It will
* continuously notify the sender of the Start message of current progress.
* continuously notify the sender of the Start message of current Progress.
* The Worker supervise the CounterService.
*/
public static class Worker extends UntypedActor {
@ -99,6 +111,7 @@ public class FaultHandlingDocSample {
// The sender of the initial Start message will continuously be notified about progress
ActorRef progressListener;
final ActorRef counterService = getContext().actorOf(new Props(CounterService.class), "counter");
final int totalCount = 51;
// Stop the CounterService child if it throws ServiceUnavailable
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
@ -128,8 +141,12 @@ public class FaultHandlingDocSample {
counterService.tell(new Increment(1), getSelf());
counterService.tell(new Increment(1), getSelf());
// Send current count to the initial sender
pipeTo(ask(counterService, GetCurrentCount, askTimeout), progressListener);
// Send current progress to the initial sender
pipeTo(ask(counterService, GetCurrentCount, askTimeout).map(new Function<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}), progressListener);
} else {
unhandled(msg);
}

View file

@ -43,15 +43,15 @@ object FaultHandlingDocSample extends App {
* work has been done.
*/
class Listener extends Actor with ActorLogging {
import CounterService._
import Worker._
// If we don't get any progress within 15 seconds then the service is unavailable
context.setReceiveTimeout(15 seconds)
def receive = {
case CurrentCount(key, count)
log.info("Current count for [{}] is [{}]", key, count)
if (count > 50) {
log.info("That's enough, shutting down")
case Progress(percent)
log.info("Current progress: {} %", percent)
if (percent >= 100.0) {
log.info("That's all, shutting down")
context.system.shutdown()
}
@ -66,13 +66,14 @@ class Listener extends Actor with ActorLogging {
object Worker {
case object Start
case object Do
case class Progress(percent: Double)
}
//#messages
/**
* Worker performs some work when it receives the `Start` message.
* It will continuously notify the sender of the `Start` message
* of current progress. The `Worker` supervise the `CounterService`.
* of current ``Progress``. The `Worker` supervise the `CounterService`.
*/
class Worker extends Actor with ActorLogging {
import Worker._
@ -87,6 +88,7 @@ class Worker extends Actor with ActorLogging {
// The sender of the initial Start message will continuously be notified about progress
var progressListener: Option[ActorRef] = None
val counterService = context.actorOf(Props[CounterService], name = "counter")
val totalCount = 51
def receive = LoggingReceive {
case Start if progressListener.isEmpty
@ -98,8 +100,10 @@ class Worker extends Actor with ActorLogging {
counterService ! Increment(1)
counterService ! Increment(1)
// Send current count to the initial sender
counterService ? GetCurrentCount pipeTo progressListener.get
// Send current progress to the initial sender
counterService ? GetCurrentCount map {
case CurrentCount(_, count) Progress(100.0 * count / totalCount)
} pipeTo progressListener.get
}
}