+act #3532: Add ask support for Java API
This commit is contained in:
parent
ffea36a8c8
commit
5e3dcaa0da
4 changed files with 141 additions and 7 deletions
|
|
@ -1,8 +1,10 @@
|
|||
package akka.actor;
|
||||
|
||||
public class JavaAPITestActor extends UntypedActor {
|
||||
public static String ANSWER = "got it!";
|
||||
|
||||
public void onReceive(Object msg) {
|
||||
getSender().tell("got it!", getSelf());
|
||||
getSender().tell(ANSWER, getSelf());
|
||||
getContext().getChildren();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
package akka.pattern;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.dispatch.Futures;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import akka.testkit.TestProbe;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import static akka.pattern.Patterns.ask;
|
||||
import static akka.pattern.Patterns.pipe;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
public class PatternsTest {
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("JavaAPI",
|
||||
AkkaSpec.testConf());
|
||||
|
||||
private final ActorSystem system = actorSystemResource.getSystem();
|
||||
|
||||
|
||||
@Test
|
||||
public void useAsk() throws Exception {
|
||||
ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test");
|
||||
assertEquals("Ask should return expected answer",
|
||||
JavaAPITestActor.ANSWER, Await.result(ask(testActor, "hey!", 3000), Duration.create(3, "seconds")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void useAskWithActorSelection() throws Exception {
|
||||
ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test2");
|
||||
ActorSelection selection = system.actorSelection("/user/test2");
|
||||
ActorIdentity id = (ActorIdentity) Await.result(ask(selection, new Identify("yo!"), 3000), Duration.create(3, "seconds"));
|
||||
assertEquals("Ask (Identify) should return the proper ActorIdentity", testActor, id.getRef());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void usePipe() throws Exception {
|
||||
TestProbe probe = new TestProbe(system);
|
||||
pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref());
|
||||
probe.expectMsg("ho!");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void usePipeWithActorSelection() throws Exception {
|
||||
TestProbe probe = new TestProbe(system);
|
||||
ActorSelection selection = system.actorSelection(probe.ref().path());
|
||||
pipe(Futures.successful("hi!"), system.dispatcher()).to(selection);
|
||||
probe.expectMsg("hi!");
|
||||
}
|
||||
}
|
||||
|
|
@ -5,13 +5,12 @@ package akka.pattern
|
|||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.actor._
|
||||
import akka.testkit.AkkaSpec
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Await
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import akka.actor.{ Actor, Props, ActorRef }
|
||||
|
||||
class AskSpec extends AkkaSpec {
|
||||
|
||||
|
|
@ -68,6 +67,16 @@ class AskSpec extends AkkaSpec {
|
|||
}.getMessage must be === expectedMsg
|
||||
}
|
||||
|
||||
"work for ActorSelection" in {
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
import system.dispatcher
|
||||
val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } }), "select-echo")
|
||||
val identityFuture = (system.actorSelection("/user/select-echo") ? Identify(None))
|
||||
.mapTo[ActorIdentity].map(_.ref.get)
|
||||
|
||||
Await.result(identityFuture, 5 seconds) must be === echo
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
*/
|
||||
package akka.pattern
|
||||
|
||||
import akka.actor.Scheduler
|
||||
import akka.actor.{ ActorSelection, Scheduler }
|
||||
import scala.concurrent.ExecutionContext
|
||||
import java.util.concurrent.Callable
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
|
@ -75,7 +75,72 @@ object Patterns {
|
|||
* });
|
||||
* }}}
|
||||
*/
|
||||
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]]
|
||||
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] =
|
||||
scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
/**
|
||||
* <i>Java API for `akka.pattern.ask`:</i>
|
||||
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
|
||||
* holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* final Future<Object> f = Patterns.ask(selection, request, timeout);
|
||||
* f.onSuccess(new Procedure<Object>() {
|
||||
* public void apply(Object o) {
|
||||
* nextActor.tell(new EnrichedResult(request, o));
|
||||
* }
|
||||
* });
|
||||
* }}}
|
||||
*/
|
||||
def ask(selection: ActorSelection, message: Any, timeout: Timeout): Future[AnyRef] =
|
||||
scalaAsk(selection, message)(timeout).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
/**
|
||||
* <i>Java API for `akka.pattern.ask`:</i>
|
||||
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
|
||||
* holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* final Future<Object> f = Patterns.ask(selection, request, timeout);
|
||||
* f.onSuccess(new Procedure<Object>() {
|
||||
* public void apply(Object o) {
|
||||
* nextActor.tell(new EnrichedResult(request, o));
|
||||
* }
|
||||
* });
|
||||
* }}}
|
||||
*/
|
||||
def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): Future[AnyRef] =
|
||||
scalaAsk(selection, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
/**
|
||||
* Register an onComplete callback on this [[scala.concurrent.Future]] to send
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue