docs for ask/pipeTo patterns

This commit is contained in:
Roland 2012-01-20 18:09:26 +01:00
parent 020c6b61da
commit 1f99b425de
5 changed files with 119 additions and 72 deletions

View file

@ -37,6 +37,17 @@ import akka.util.Duration;
import akka.actor.ActorTimeoutException;
//#import-gracefulStop
//#import-askPipeTo
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipeTo;
import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.util.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
//#import-askPipeTo
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
@ -45,6 +56,8 @@ import akka.dispatch.MessageDispatcher;
import org.junit.Test;
import scala.Option;
import java.lang.Object;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import akka.pattern.Patterns;
@ -198,6 +211,43 @@ public class UntypedActorDocTestBase {
//#gracefulStop
system.shutdown();
}
class Result {
final int x;
final String s;
public Result(int x, String s) {
this.x = x;
this.s = s;
}
}
@Test
public void usePatternsAskPipeTo() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class));
//#ask-pipeTo
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(ask(actorA, "request")); // using `akka.actor.timeout` from config
futures.add(ask(actorB, "reqeest", t)); // using explicit timeout from above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher());
final Future<Result> transformed = aggregate.map(new akka.japi.Function<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String s = (String) it.next();
final int x = (Integer) it.next();
return new Result(x, s);
}
});
pipeTo(transformed, actorC);
//#ask-pipeTo
}
public static class MyActor extends UntypedActor {

View file

@ -315,26 +315,37 @@ If invoked without the sender parameter the sender will be
Ask: Send-And-Receive-Future
----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future` which will be completed with
an ``akka.actor.AskTimeoutException`` after the specified timeout:
The ``ask`` pattern involves actors as well as futures, hence it is offered as
a use pattern rather than a method on :class:`ActorRef`:
.. code-block:: java
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipeTo
long timeoutMillis = 1000;
Future future = actorRef.ask("Hello", timeoutMillis);
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipeTo
The receiving actor should reply to this message, which will complete the
future with the reply message as value; ``getSender.tell(result)``.
This example demonstrates ``ask`` together with the ``pipeTo`` pattern on
futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: ``ask`` produces
a :class:`Future`, two of which are composed into a new future using the
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipeTo`` installs
an ``onComplete``-handler on the future to effect the submission of the
aggregated :class:`Result` to another actor.
Using ``ask`` will send a message to the receiving Actor as with ``tell``, and
the receiving actor must reply with ``getSender().tell(reply)`` in order to
complete the returned :class:`Future` with a value. The ``ask`` operation
involves creating an internal actor for handling this reply, which needs to
have a timeout after which it is destroyed in order not to leak resources; see
more below.
To complete the future with an exception you need send a Failure message to the sender.
This is not done automatically when an actor throws an exception while processing a
This is *not done automatically* when an actor throws an exception while processing a
message.
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#reply-exception
If the actor does not complete the future, it will expire after the timeout period,
specified as parameter to the ``ask`` method.
specified as parameter to the ``ask`` method; this will complete the
:class:`Future` with an :class:`AskTimeoutException`.
See :ref:`futures-java` for more information on how to await or query a
future.
@ -353,15 +364,6 @@ Gives you a way to avoid blocking.
there is not yet a way to detect these illegal accesses at compile time. See also:
:ref:`jmm-shared-state`
The future returned from the ``ask`` method can conveniently be passed around or
chained with further processing steps, but sometimes you just need the value,
even if that entails waiting for it (but keep in mind that waiting inside an
actor is prone to dead-locks, e.g. if obtaining the result depends on
processing another message on this actor).
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java
:include: import-future,using-ask
Forward message
---------------

View file

@ -325,9 +325,9 @@ Send messages
Messages are sent to an Actor through one of the following methods.
* ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return
immediately. Also know as ``tell``.
immediately. Also known as ``tell``.
* ``?`` sends a message asynchronously and returns a :class:`Future`
representing a possible reply. Also know as ``ask``.
representing a possible reply. Also known as ``ask``.
Message ordering is guaranteed on a per-sender basis.
@ -352,25 +352,34 @@ If invoked from an instance that is **not** an Actor the sender will be
Ask: Send-And-Receive-Future
----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future` which will be completed with
an ``akka.actor.AskTimeoutException`` after the specified timeout:
The ``ask`` pattern involves actors as well as futures, hence it is offered as
a use pattern rather than a method on :class:`ActorRef`:
.. code-block:: scala
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#ask-pipeTo
val future = actor ? "hello"
This example demonstrates ``ask`` together with the ``pipeTo`` pattern on
futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: ``ask`` produces
a :class:`Future`, three of which are composed into a new future using the
for-comprehension and then ``pipeTo`` installs an ``onComplete``-handler on the
future to effect the submission of the aggregated :class:`Result` to another
actor.
The receiving actor should reply to this message, which will complete the
future with the reply message as value; ``sender ! result``.
Using ``ask`` will send a message to the receiving Actor as with ``tell``, and
the receiving actor must reply with ``sender ! reply`` in order to complete the
returned :class:`Future` with a value. The ``ask`` operation involves creating
an internal actor for handling this reply, which needs to have a timeout after
which it is destroyed in order not to leak resources; see more below.
To complete the future with an exception you need send a Failure message to the sender.
This is not done automatically when an actor throws an exception while processing a
This is *not done automatically* when an actor throws an exception while processing a
message.
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-exception
If the actor does not complete the future, it will expire after the timeout period,
which is taken from one of the following locations in order of precedence:
If the actor does not complete the future, it will expire after the timeout
period, completing it with an :class:`AskTimeoutException`. The timeout is
taken from one of the following locations in order of precedence:
1. explicitly given timeout as in:
@ -380,6 +389,9 @@ which is taken from one of the following locations in order of precedence:
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-implicit-timeout
3. actor systems default value from ``akka.actor.timeout`` setting for
:meth:`ask` methods
See :ref:`futures-scala` for more information on how to await or query a
future.
@ -398,23 +410,6 @@ Gives you a way to avoid blocking.
there is not yet a way to detect these illegal accesses at compile time.
See also: :ref:`jmm-shared-state`
The future returned from the ``?`` method can conveniently be passed around or
chained with further processing steps, but sometimes you just need the value,
even if that entails waiting for it (but keep in mind that waiting inside an
actor is prone to dead-locks, e.g. if obtaining the result depends on
processing another message on this actor).
For this purpose, there is the method :meth:`Future.as[T]` which waits until
either the future is completed or its timeout expires, whichever comes first.
The result is then inspected and returned as :class:`Some[T]` if it was
normally completed and the answers runtime type matches the desired type; if
the future contains an exception or the value cannot be cast to the desired
type, it will throw the exception or a :class:`ClassCastException` (if you want
to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In
case of a timeout, :obj:`None` is returned.
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-ask
Forward message
---------------

View file

@ -212,28 +212,6 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
system.stop(myActor)
}
"using ask" in {
//#using-ask
import akka.pattern.ask
class MyActor extends Actor {
def receive = {
case x: String sender ! x.toUpperCase
case n: Int sender ! (n + 1)
}
}
val myActor = system.actorOf(Props(new MyActor), name = "myactor")
implicit val timeout = system.settings.ActorTimeout
val future = ask(myActor, "hello")
for (x future) println(x) //Prints "hello"
val result: Future[Int] = for (x ask(myActor, 3).mapTo[Int]) yield { 2 * x }
//#using-ask
system.stop(myActor)
}
"using implicit timeout" in {
val myActor = system.actorOf(Props(new FirstActor))
//#using-implicit-timeout
@ -331,6 +309,28 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
case e: ActorTimeoutException // the actor wasn't stopped within 5 seconds
}
//#gracefulStop
}
"using pattern ask / pipeTo" in {
val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
//#ask-pipeTo
import akka.pattern.{ ask, pipeTo }
case class Result(x: Int, s: String, d: Double)
case object Request
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val f: Future[Result] =
for {
x ask(actorA, Request).mapTo[Int] // call pattern directly
s actorB ask Request mapTo manifest[String] // call by implicit conversion
d actorC ? Request mapTo manifest[Double] // call by symbolic name
} yield Result(x, s, d)
f pipeTo actorD // .. or ..
pipeTo(f, actorD)
//#ask-pipeTo
}
}

View file

@ -252,7 +252,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) {
try {
val t = remoteSettings.RemoteSystemDaemonAckTimeout
Await.result(connection ? (newGossip, t), t) match {
Await.result(connection.?(newGossip)(t), t) match {
case Success(receiver) log.debug("Gossip sent to [{}] was successfully received", receiver)
case Failure(cause) log.error(cause, cause.toString)
}