diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java index e84f45f885..aa1b870efb 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java @@ -1,8 +1,10 @@ package akka.actor; public class JavaAPITestActor extends UntypedActor { + public static String ANSWER = "got it!"; + public void onReceive(Object msg) { - getSender().tell("got it!", getSelf()); + getSender().tell(ANSWER, getSelf()); getContext().getChildren(); } } diff --git a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java new file mode 100644 index 0000000000..033a110f0c --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java @@ -0,0 +1,58 @@ +package akka.pattern; + +import akka.actor.*; +import akka.dispatch.Futures; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import akka.testkit.TestProbe; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + +import static akka.pattern.Patterns.ask; +import static akka.pattern.Patterns.pipe; +import static org.junit.Assert.assertEquals; + +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +public class PatternsTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("JavaAPI", + AkkaSpec.testConf()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + + @Test + public void useAsk() throws Exception { + ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test"); + assertEquals("Ask should return expected answer", + JavaAPITestActor.ANSWER, Await.result(ask(testActor, "hey!", 3000), Duration.create(3, "seconds"))); + } + + @Test + public void useAskWithActorSelection() throws Exception { + ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test2"); + ActorSelection selection = system.actorSelection("/user/test2"); + ActorIdentity id = (ActorIdentity) Await.result(ask(selection, new Identify("yo!"), 3000), Duration.create(3, "seconds")); + assertEquals("Ask (Identify) should return the proper ActorIdentity", testActor, id.getRef()); + } + + @Test + public void usePipe() throws Exception { + TestProbe probe = new TestProbe(system); + pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref()); + probe.expectMsg("ho!"); + } + + @Test + public void usePipeWithActorSelection() throws Exception { + TestProbe probe = new TestProbe(system); + ActorSelection selection = system.actorSelection(probe.ref().path()); + pipe(Futures.successful("hi!"), system.dispatcher()).to(selection); + probe.expectMsg("hi!"); + } +} diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index 57431242ed..71dbe1b45e 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -5,13 +5,12 @@ package akka.pattern import language.postfixOps +import akka.actor._ import akka.testkit.AkkaSpec -import scala.concurrent.duration._ -import scala.concurrent.Await -import akka.testkit.DefaultTimeout import akka.util.Timeout +import scala.concurrent.Await +import scala.concurrent.duration._ import scala.util.Failure -import akka.actor.{ Actor, Props, ActorRef } class AskSpec extends AkkaSpec { @@ -68,6 +67,16 @@ class AskSpec extends AkkaSpec { }.getMessage must be === expectedMsg } + "work for ActorSelection" in { + implicit val timeout = Timeout(5 seconds) + import system.dispatcher + val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } }), "select-echo") + val identityFuture = (system.actorSelection("/user/select-echo") ? Identify(None)) + .mapTo[ActorIdentity].map(_.ref.get) + + Await.result(identityFuture, 5 seconds) must be === echo + } + } } diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index b293d3a79d..34f137f27c 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -3,7 +3,7 @@ */ package akka.pattern -import akka.actor.Scheduler +import akka.actor.{ ActorSelection, Scheduler } import scala.concurrent.ExecutionContext import java.util.concurrent.Callable import scala.concurrent.duration.FiniteDuration @@ -75,7 +75,72 @@ object Patterns { * }); * }}} */ - def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] + def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = + scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] + + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] + * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(selection, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(selection: ActorSelection, message: Any, timeout: Timeout): Future[AnyRef] = + scalaAsk(selection, message)(timeout).asInstanceOf[Future[AnyRef]] + + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] + * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(selection, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): Future[AnyRef] = + scalaAsk(selection, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] /** * Register an onComplete callback on this [[scala.concurrent.Future]] to send