From 1d4d65d449000b68a29410a8d54ab81af32e0347 Mon Sep 17 00:00:00 2001 From: RickLatrine Date: Sat, 8 Dec 2012 11:27:07 +0100 Subject: [PATCH] Moved example code, fixed indentation in SupervisedAsk --- .../java/code/docs/pattern/SupervisedAsk.java | 153 +++++++++--------- .../code/docs/pattern/SupervisedAskSpec.java | 23 +++ akka-docs/rst/java/howto.rst | 3 +- 3 files changed, 100 insertions(+), 79 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java index 089f2b77ef..17151595a0 100644 --- a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java @@ -1,90 +1,87 @@ +package docs.pattern; + public class SupervisedAsk { - private static class AskParam { - Props props; - Object message; - Timeout timeout; - AskParam(Props props, Object message, Timeout timeout) { - this.props = props; - this.message = message; - this.timeout = timeout; - } + private static class AskParam { + Props props; + Object message; + Timeout timeout; + + AskParam(Props props, Object message, Timeout timeout) { + this.props = props; + this.message = message; + this.timeout = timeout; } + } - private static class AskTimeout {} - - public static class AskSupervisorCreator extends UntypedActor { + private static class AskTimeout { + } - @Override - public void onReceive(Object message) throws Exception { - if (message instanceof AskParam) { - ActorRef supervisor = getContext().actorOf( - Props.apply(AskSupervisor.class)); - supervisor.forward(message, getContext()); - } else { - unhandled(message); + public static class AskSupervisorCreator extends UntypedActor { + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof AskParam) { + ActorRef supervisor = getContext().actorOf( + Props.apply(AskSupervisor.class)); + supervisor.forward(message, getContext()); + } else { + unhandled(message); + } + } + } + + public static class AskSupervisor extends UntypedActor { + private ActorRef targetActor; + private ActorRef caller; + private AskParam askParam; + private Cancellable timeoutMessage; + + @Override + public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(0, Duration.Zero(), + new Function() { + public Directive apply(Throwable cause) { + caller.tell(new Status.Failure(cause)); + return SupervisorStrategy.stop(); } - } + }); } - public static class AskSupervisor extends UntypedActor { - private ActorRef targetActor; - private ActorRef caller; - private AskParam askParam; - private Cancellable timeoutMessage; - - @Override - public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(0, Duration.Zero(), - new Function() { - public Directive apply(Throwable cause) { - caller.tell(new Status.Failure(cause)); - return SupervisorStrategy.stop(); - } - }); - } - - @Override - public void onReceive(Object message) throws Exception { - if (message instanceof AskParam) { - askParam = (AskParam) message; - caller = getSender(); - targetActor = getContext().actorOf(askParam.props); - getContext().watch(targetActor); - targetActor.forward(askParam.message, getContext()); - Scheduler scheduler = getContext().system().scheduler(); - timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), self(), new AskTimeout()); - } else if (message instanceof Terminated) { - Throwable ex = new ActorKilledException("Target actor terminated."); - caller.tell(new Status.Failure(ex)); - timeoutMessage.cancel(); - getContext().stop(self()); - } else if (message instanceof AskTimeout) { - Throwable ex = new TimeoutException("Target actor timed out after " + askParam.timeout.toString()); - caller.tell(new Status.Failure(ex)); - getContext().stop(self()); - } else - unhandled(message); - } + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof AskParam) { + askParam = (AskParam) message; + caller = getSender(); + targetActor = getContext().actorOf(askParam.props); + getContext().watch(targetActor); + targetActor.forward(askParam.message, getContext()); + Scheduler scheduler = getContext().system().scheduler(); + timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), + self(), new AskTimeout()); + } else if (message instanceof Terminated) { + Throwable ex = new ActorKilledException("Target actor terminated."); + caller.tell(new Status.Failure(ex)); + timeoutMessage.cancel(); + getContext().stop(self()); + } else if (message instanceof AskTimeout) { + Throwable ex = new TimeoutException("Target actor timed out after " + + askParam.timeout.toString()); + caller.tell(new Status.Failure(ex)); + getContext().stop(self()); + } else + unhandled(message); } + } - public static Future askOf(ActorRef supervisorCreator, - Props props, Object message, Timeout timeout) { - AskParam param = new AskParam(props, message, timeout); - return Patterns.ask(supervisorCreator, param, timeout); - } + public static Future askOf(ActorRef supervisorCreator, Props props, + Object message, Timeout timeout) { + AskParam param = new AskParam(props, message, timeout); + return Patterns.ask(supervisorCreator, param, timeout); + } - synchronized public static ActorRef createSupervisorCreator(ActorRefFactory factory) { - return factory.actorOf(Props.apply(AskSupervisorCreator.class)); - } -} - -// example usage -try { - ActorRef supervisorCreator = SupervisedAsk.createSupervisorCreator(actorSystem); - Future finished = SupervisedAsk.askOf(supervisorCreator, - Props.apply(SomeActor.class), message, timeout); - SomeResult result = (SomeResult) Await.result(finished, timeout.duration()); -} catch (Exception e) { - // exception propagated by supervision + synchronized public static ActorRef createSupervisorCreator( + ActorRefFactory factory) { + return factory.actorOf(Props.apply(AskSupervisorCreator.class)); + } } \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java new file mode 100644 index 0000000000..f1d1af32f6 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java @@ -0,0 +1,23 @@ +package docs.pattern; + +import docs.testkit.TestKitSampleTest.SomeActor; +import scala.actors.Future; +import actor.ActorRef; +import actor.Props; + +public class SupervisedAskSpec { + + public void execute() { + // example usage + try { + ActorRef supervisorCreator = SupervisedAsk + .createSupervisorCreator(actorSystem); + Future finished = SupervisedAsk.askOf(supervisorCreator, + Props.apply(SomeActor.class), message, timeout); + Object result = Await.result(finished, + timeout.duration()); + } catch (Exception e) { + // exception propagated by supervision + } + } +} diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index cdb308143b..a1f8e4a11f 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -24,7 +24,7 @@ Single-Use Actor Trees with High-Level Error Reporting A nice way to enter the actor world from java is the use of Patterns.ask(). This method starts a temporary actor to forward the message and collect the result from the actor to be "asked". In case of errors within the asked actor the default supervision handling will take over. -The caller of Patterns.ask() will not be notified. +The caller of Patterns.ask() will *not* be notified. If that caller is interested in such an exception, he must make sure that the asked actor replies with Status.Failure(Throwable). Behind the asked actor a complex actor hierarchy might be spawned to accomplish asynchronous work. @@ -51,6 +51,7 @@ Afterwards the actor hierarchy is stopped. Finally we are able to execute an actor and receive the results or exceptions. +.. includecode:: code/docs/pattern/SupervisedAskSpec.java Template Pattern ================