Imports fixed. Corrected akka 2.1 releated changes.
This commit is contained in:
parent
3f1bab2328
commit
0dde5c484a
2 changed files with 9 additions and 8 deletions
|
|
@ -2,6 +2,9 @@ package docs.pattern;
|
|||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import akka.actor.ActorKilledException;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorRefFactory;
|
||||
|
|
@ -14,10 +17,8 @@ import akka.actor.SupervisorStrategy;
|
|||
import akka.actor.SupervisorStrategy.Directive;
|
||||
import akka.actor.Terminated;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.dispatch.Future;
|
||||
import akka.japi.Function;
|
||||
import akka.pattern.Patterns;
|
||||
import akka.util.Duration;
|
||||
import akka.util.Timeout;
|
||||
|
||||
public class SupervisedAsk {
|
||||
|
|
@ -62,7 +63,7 @@ public class SupervisedAsk {
|
|||
return new OneForOneStrategy(0, Duration.Zero(),
|
||||
new Function<Throwable, Directive>() {
|
||||
public Directive apply(Throwable cause) {
|
||||
caller.tell(new Status.Failure(cause));
|
||||
caller.tell(new Status.Failure(cause), self());
|
||||
return SupervisorStrategy.stop();
|
||||
}
|
||||
});
|
||||
|
|
@ -78,16 +79,16 @@ public class SupervisedAsk {
|
|||
targetActor.forward(askParam.message, getContext());
|
||||
Scheduler scheduler = getContext().system().scheduler();
|
||||
timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(),
|
||||
self(), new AskTimeout());
|
||||
self(), new AskTimeout(), context().dispatcher());
|
||||
} else if (message instanceof Terminated) {
|
||||
Throwable ex = new ActorKilledException("Target actor terminated.");
|
||||
caller.tell(new Status.Failure(ex));
|
||||
caller.tell(new Status.Failure(ex), self());
|
||||
timeoutMessage.cancel();
|
||||
getContext().stop(self());
|
||||
} else if (message instanceof AskTimeout) {
|
||||
Throwable ex = new TimeoutException("Target actor timed out after "
|
||||
+ askParam.timeout.toString());
|
||||
caller.tell(new Status.Failure(ex));
|
||||
caller.tell(new Status.Failure(ex), self());
|
||||
getContext().stop(self());
|
||||
} else
|
||||
unhandled(message);
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
package docs.pattern;
|
||||
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.Future;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorRefFactory;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.dispatch.Await;
|
||||
import akka.dispatch.Future;
|
||||
import akka.util.Timeout;
|
||||
|
||||
public class SupervisedAskSpec {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue