pekko/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java
Patrik Nordwall 4bd6b7aab1 improve AbstractActor, #21717
* Receive class that wraps PartialFunction, to avoid
  scary scala types
* move AbstractActorContext to AbstractActor.ActorContext
* converting docs, many, many UntypedActor
* removing UntypedActor docs
* add unit test for ReceiveBuilder
* MiMa filters
* consistent use of getContext(), self(), sender()
* rename cross references
* migration guide
* skip samples for now
* improve match type safetyi, add matchUnchecked
  * the `? extends P` caused code like this to compile:
    `match(String.class, (Integer i) -> {})`
  * added matchUnchecked, since it can still be useful (um, convenient)
    to be able to do:
    `matchUnchecked(List.class, (List<String> list) -> {})`
* eleminate some scala.Option
  * preRestart
  * findChild
  * ActorIdentity.getActorRef
2017-01-23 18:30:52 +01:00

109 lines
No EOL
3.5 KiB
Java

package docs.pattern;
import java.util.concurrent.TimeoutException;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.Actor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Scheduler;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
public class SupervisedAsk {
private static class AskParam {
Props props;
Object message;
Timeout timeout;
AskParam(Props props, Object message, Timeout timeout) {
this.props = props;
this.message = message;
this.timeout = timeout;
}
}
private static class AskTimeout {
}
public static class AskSupervisorCreator extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof AskParam) {
ActorRef supervisor = getContext().actorOf(
Props.create(AskSupervisor.class));
supervisor.forward(message, getContext());
} else {
unhandled(message);
}
}
}
public static class AskSupervisor extends UntypedActor {
private ActorRef targetActor;
private ActorRef caller;
private AskParam askParam;
private Cancellable timeoutMessage;
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(0, Duration.Zero(),
new Function<Throwable, Directive>() {
public Directive apply(Throwable cause) {
caller.tell(new Status.Failure(cause), self());
return SupervisorStrategy.stop();
}
});
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof AskParam) {
askParam = (AskParam) message;
caller = sender();
targetActor = getContext().actorOf(askParam.props);
getContext().watch(targetActor);
targetActor.forward(askParam.message, getContext());
Scheduler scheduler = getContext().system().scheduler();
timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(),
self(), new AskTimeout(), getContext().dispatcher(), null);
} else if (message instanceof Terminated) {
Throwable ex = new ActorKilledException("Target actor terminated.");
caller.tell(new Status.Failure(ex), self());
timeoutMessage.cancel();
getContext().stop(self());
} else if (message instanceof AskTimeout) {
Throwable ex = new TimeoutException("Target actor timed out after "
+ askParam.timeout.toString());
caller.tell(new Status.Failure(ex), self());
getContext().stop(self());
} else
unhandled(message);
}
}
public static Future<Object> askOf(ActorRef supervisorCreator, Props props,
Object message, Timeout timeout) {
AskParam param = new AskParam(props, message, timeout);
return Patterns.ask(supervisorCreator, param, timeout);
}
synchronized public static ActorRef createSupervisorCreator(
ActorRefFactory factory) {
return factory.actorOf(Props.create(AskSupervisorCreator.class));
}
}