diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index 676254425e..054495b390 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -37,6 +37,17 @@ import akka.util.Duration; import akka.actor.ActorTimeoutException; //#import-gracefulStop +//#import-askPipeTo +import static akka.pattern.Patterns.ask; +import static akka.pattern.Patterns.pipeTo; +import akka.dispatch.Future; +import akka.dispatch.Futures; +import akka.util.Duration; +import akka.util.Timeout; +import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +//#import-askPipeTo + import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -45,6 +56,8 @@ import akka.dispatch.MessageDispatcher; import org.junit.Test; import scala.Option; import java.lang.Object; +import java.util.ArrayList; +import java.util.Iterator; import java.util.concurrent.TimeUnit; import akka.pattern.Patterns; @@ -198,6 +211,43 @@ public class UntypedActorDocTestBase { //#gracefulStop system.shutdown(); } + + class Result { + final int x; + final String s; + public Result(int x, String s) { + this.x = x; + this.s = s; + } + } + + @Test + public void usePatternsAskPipeTo() { + ActorSystem system = ActorSystem.create("MySystem"); + ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class)); + ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class)); + ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class)); + //#ask-pipeTo + final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + + final ArrayList> futures = new ArrayList>(); + futures.add(ask(actorA, "request")); // using `akka.actor.timeout` from config + futures.add(ask(actorB, "reqeest", t)); // using explicit timeout from above + + final Future> aggregate = Futures.sequence(futures, system.dispatcher()); + + final Future transformed = aggregate.map(new akka.japi.Function, Result>() { + public Result apply(Iterable coll) { + final Iterator it = coll.iterator(); + final String s = (String) it.next(); + final int x = (Integer) it.next(); + return new Result(x, s); + } + }); + + pipeTo(transformed, actorC); + //#ask-pipeTo + } public static class MyActor extends UntypedActor { diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 7e0d788590..b2978f7095 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -315,26 +315,37 @@ If invoked without the sender parameter the sender will be Ask: Send-And-Receive-Future ---------------------------- -Using ``?`` will send a message to the receiving Actor asynchronously and -will immediately return a :class:`Future` which will be completed with -an ``akka.actor.AskTimeoutException`` after the specified timeout: +The ``ask`` pattern involves actors as well as futures, hence it is offered as +a use pattern rather than a method on :class:`ActorRef`: -.. code-block:: java +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipeTo - long timeoutMillis = 1000; - Future future = actorRef.ask("Hello", timeoutMillis); +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipeTo -The receiving actor should reply to this message, which will complete the -future with the reply message as value; ``getSender.tell(result)``. +This example demonstrates ``ask`` together with the ``pipeTo`` pattern on +futures, because this is likely to be a common combination. Please note that +all of the above is completely non-blocking and asynchronous: ``ask`` produces +a :class:`Future`, two of which are composed into a new future using the +:meth:`Futures.sequence` and :meth:`map` methods and then ``pipeTo`` installs +an ``onComplete``-handler on the future to effect the submission of the +aggregated :class:`Result` to another actor. + +Using ``ask`` will send a message to the receiving Actor as with ``tell``, and +the receiving actor must reply with ``getSender().tell(reply)`` in order to +complete the returned :class:`Future` with a value. The ``ask`` operation +involves creating an internal actor for handling this reply, which needs to +have a timeout after which it is destroyed in order not to leak resources; see +more below. To complete the future with an exception you need send a Failure message to the sender. -This is not done automatically when an actor throws an exception while processing a +This is *not done automatically* when an actor throws an exception while processing a message. .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#reply-exception If the actor does not complete the future, it will expire after the timeout period, -specified as parameter to the ``ask`` method. +specified as parameter to the ``ask`` method; this will complete the +:class:`Future` with an :class:`AskTimeoutException`. See :ref:`futures-java` for more information on how to await or query a future. @@ -353,15 +364,6 @@ Gives you a way to avoid blocking. there is not yet a way to detect these illegal accesses at compile time. See also: :ref:`jmm-shared-state` -The future returned from the ``ask`` method can conveniently be passed around or -chained with further processing steps, but sometimes you just need the value, -even if that entails waiting for it (but keep in mind that waiting inside an -actor is prone to dead-locks, e.g. if obtaining the result depends on -processing another message on this actor). - -.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java - :include: import-future,using-ask - Forward message --------------- diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 558b50fac8..da0ae5b91e 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -325,9 +325,9 @@ Send messages Messages are sent to an Actor through one of the following methods. * ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return - immediately. Also know as ``tell``. + immediately. Also known as ``tell``. * ``?`` sends a message asynchronously and returns a :class:`Future` - representing a possible reply. Also know as ``ask``. + representing a possible reply. Also known as ``ask``. Message ordering is guaranteed on a per-sender basis. @@ -352,25 +352,34 @@ If invoked from an instance that is **not** an Actor the sender will be Ask: Send-And-Receive-Future ---------------------------- -Using ``?`` will send a message to the receiving Actor asynchronously and -will immediately return a :class:`Future` which will be completed with -an ``akka.actor.AskTimeoutException`` after the specified timeout: +The ``ask`` pattern involves actors as well as futures, hence it is offered as +a use pattern rather than a method on :class:`ActorRef`: -.. code-block:: scala +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#ask-pipeTo - val future = actor ? "hello" +This example demonstrates ``ask`` together with the ``pipeTo`` pattern on +futures, because this is likely to be a common combination. Please note that +all of the above is completely non-blocking and asynchronous: ``ask`` produces +a :class:`Future`, three of which are composed into a new future using the +for-comprehension and then ``pipeTo`` installs an ``onComplete``-handler on the +future to effect the submission of the aggregated :class:`Result` to another +actor. -The receiving actor should reply to this message, which will complete the -future with the reply message as value; ``sender ! result``. +Using ``ask`` will send a message to the receiving Actor as with ``tell``, and +the receiving actor must reply with ``sender ! reply`` in order to complete the +returned :class:`Future` with a value. The ``ask`` operation involves creating +an internal actor for handling this reply, which needs to have a timeout after +which it is destroyed in order not to leak resources; see more below. To complete the future with an exception you need send a Failure message to the sender. -This is not done automatically when an actor throws an exception while processing a +This is *not done automatically* when an actor throws an exception while processing a message. .. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-exception -If the actor does not complete the future, it will expire after the timeout period, -which is taken from one of the following locations in order of precedence: +If the actor does not complete the future, it will expire after the timeout +period, completing it with an :class:`AskTimeoutException`. The timeout is +taken from one of the following locations in order of precedence: 1. explicitly given timeout as in: @@ -380,6 +389,9 @@ which is taken from one of the following locations in order of precedence: .. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-implicit-timeout +3. actor system’s default value from ``akka.actor.timeout`` setting for + :meth:`ask` methods + See :ref:`futures-scala` for more information on how to await or query a future. @@ -398,23 +410,6 @@ Gives you a way to avoid blocking. there is not yet a way to detect these illegal accesses at compile time. See also: :ref:`jmm-shared-state` -The future returned from the ``?`` method can conveniently be passed around or -chained with further processing steps, but sometimes you just need the value, -even if that entails waiting for it (but keep in mind that waiting inside an -actor is prone to dead-locks, e.g. if obtaining the result depends on -processing another message on this actor). - -For this purpose, there is the method :meth:`Future.as[T]` which waits until -either the future is completed or its timeout expires, whichever comes first. -The result is then inspected and returned as :class:`Some[T]` if it was -normally completed and the answer’s runtime type matches the desired type; if -the future contains an exception or the value cannot be cast to the desired -type, it will throw the exception or a :class:`ClassCastException` (if you want -to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In -case of a timeout, :obj:`None` is returned. - -.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-ask - Forward message --------------- diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index a753325429..e2bef1776f 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -212,28 +212,6 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.stop(myActor) } - "using ask" in { - //#using-ask - import akka.pattern.ask - - class MyActor extends Actor { - def receive = { - case x: String ⇒ sender ! x.toUpperCase - case n: Int ⇒ sender ! (n + 1) - } - } - - val myActor = system.actorOf(Props(new MyActor), name = "myactor") - implicit val timeout = system.settings.ActorTimeout - val future = ask(myActor, "hello") - for (x ← future) println(x) //Prints "hello" - - val result: Future[Int] = for (x ← ask(myActor, 3).mapTo[Int]) yield { 2 * x } - //#using-ask - - system.stop(myActor) - } - "using implicit timeout" in { val myActor = system.actorOf(Props(new FirstActor)) //#using-implicit-timeout @@ -331,6 +309,28 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { case e: ActorTimeoutException ⇒ // the actor wasn't stopped within 5 seconds } //#gracefulStop - } + + "using pattern ask / pipeTo" in { + val actorA, actorB, actorC, actorD = system.actorOf(Props.empty) + //#ask-pipeTo + import akka.pattern.{ ask, pipeTo } + + case class Result(x: Int, s: String, d: Double) + case object Request + + implicit val timeout = Timeout(5 seconds) // needed for `?` below + + val f: Future[Result] = + for { + x ← ask(actorA, Request).mapTo[Int] // call pattern directly + s ← actorB ask Request mapTo manifest[String] // call by implicit conversion + d ← actorC ? Request mapTo manifest[Double] // call by symbolic name + } yield Result(x, s, d) + + f pipeTo actorD // .. or .. + pipeTo(f, actorD) + //#ask-pipeTo + } + } diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 788e86bb1c..74de988c59 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -252,7 +252,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { try { val t = remoteSettings.RemoteSystemDaemonAckTimeout - Await.result(connection ? (newGossip, t), t) match { + Await.result(connection.?(newGossip)(t), t) match { case Success(receiver) ⇒ log.debug("Gossip sent to [{}] was successfully received", receiver) case Failure(cause) ⇒ log.error(cause, cause.toString) }