Adding 'ask' to replace 'sendRequestReplyFuture' and removing sendRequestReply

This commit is contained in:
Viktor Klang 2011-06-13 13:28:29 +02:00
parent ec9c2e100e
commit fd5afde4ff
11 changed files with 25 additions and 62 deletions

View file

@ -274,54 +274,17 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
/**
* Akka Java API. <p/>
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the default timeout of the Actor (setTimeout()) and omits the sender reference
*/
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null)
/**
* Akka Java API. <p/>
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the default timeout of the Actor (setTimeout())
*/
def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender)
/**
* Akka Java API. <p/>
* Sends a message asynchronously and waits on a future for a reply message under the hood.
* <p/>
* It waits on the reply either until it receives it or until the timeout expires
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = {
!!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tfrom [" + (if (sender ne null) sender.address else "nowhere") +
"]\n\twith timeout [" + timeout +
"]\n\ttimed out."))
.asInstanceOf[AnyRef]
}
/**
* Akka Java API. <p/>
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout()) and omits the sender
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
def ask[T <: AnyRef](message: AnyRef): Future[T] = ask(message, timeout, null).asInstanceOf[Future[T]]
/**
* Akka Java API. <p/>
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout())
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]]
def ask[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = ask(message, timeout, sender).asInstanceOf[Future[T]]
/**
* Akka Java API. <p/>
@ -331,10 +294,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* If you are sending messages using <code>ask</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
def ask[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
/**
* Akka Java API. <p/>

View file

@ -19,7 +19,7 @@ in its mailbox.
None of these mailboxes work with blocking message send, e.g. the message
send operations that are relying on futures; ``!!``, ``!!!``,
``sendRequestReply`` and ``sendRequestReplyFuture``. If the node has crashed
``sendRequestReply`` and ``ask``. If the node has crashed
and then restarted, the thread that was blocked waiting for the reply is gone
and there is no way we can deliver the message.

View file

@ -17,11 +17,11 @@ Use with Actors
There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.sendOneWay(msg);``), which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``.
Using the ``ActorRef``\'s ``sendRequestReplyFuture`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:
.. code-block:: java
Future[Object] future = actorRef.sendRequestReplyFuture[Object](msg);
Future[Object] future = actorRef.ask[Object](msg);
Object result = future.get(); //Block until result is available, usually bad practice
This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply. Due to the dynamic nature of Akka's ``UntypedActor``\s this result can be anything. The safest way to deal with this is to specify the result to an ``Object`` as is shown in the above example. You can also use the expected result type instead of ``Any``, but if an unexpected type were to be returned you will get a ``ClassCastException``. For more elegant ways to deal with this and to use the result without blocking, refer to `Functional Futures`_.

View file

@ -109,7 +109,7 @@ Send messages
Messages are sent to an Actor through one of the 'send' methods.
* 'sendOneWay' means “fire-and-forget”, e.g. send a message asynchronously and return immediately.
* 'sendRequestReply' means “send-and-reply-eventually”, e.g. send a message asynchronously and wait for a reply through a Future. Here you can specify a timeout. Using timeouts is very important. If no timeout is specified then the actors default timeout (set by the 'getContext().setTimeout(..)' method in the 'ActorRef') is used. This method throws an 'ActorTimeoutException' if the call timed out.
* 'sendRequestReplyFuture' sends a message asynchronously and returns a 'Future'.
* 'ask' sends a message asynchronously and returns a 'Future'.
In all these methods you have the option of passing along your 'ActorRef' context variable. Make it a practice of doing so because it will allow the receiver actors to be able to respond to your message, since the sender reference is sent along with the message.
@ -158,11 +158,11 @@ Here are some examples:
Send-And-Receive-Future
^^^^^^^^^^^^^^^^^^^^^^^
Using 'sendRequestReplyFuture' will send a message to the receiving Actor asynchronously and will immediately return a 'Future'.
Using 'ask' will send a message to the receiving Actor asynchronously and will immediately return a 'Future'.
.. code-block:: java
Future future = actorRef.sendRequestReplyFuture("Hello", getContext(), 1000);
Future future = actorRef.ask("Hello", getContext(), 1000);
The 'Future' interface looks like this:
@ -182,7 +182,7 @@ So the normal way of working with futures is something like this:
.. code-block:: java
Future future = actorRef.sendRequestReplyFuture("Hello", getContext(), 1000);
Future future = actorRef.ask("Hello", getContext(), 1000);
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();
@ -305,7 +305,7 @@ On this 'Option' you can invoke 'boolean isDefined()' or 'boolean isEmpty()' to
Reply using the sender future
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If a message was sent with the 'sendRequestReply' or 'sendRequestReplyFuture' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
If a message was sent with the 'sendRequestReply' or 'ask' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option<Promise> getSenderFuture()'.

View file

@ -26,7 +26,7 @@ akka {
actor {
timeout = 2000 # Default timeout for Future based invocations
# - Actor: !! && !!!
# - UntypedActor: sendRequestReply && sendRequestReplyFuture
# - UntypedActor: sendRequestReply && ask
# - TypedActor: methods with non-void return type
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness

View file

@ -19,8 +19,8 @@ public class UntypedCoordinatedExample {
Thread.sleep(3000);
Future future1 = counter1.sendRequestReplyFuture("GetCount");
Future future2 = counter2.sendRequestReplyFuture("GetCount");
Future future1 = counter1.ask("GetCount");
Future future2 = counter2.ask("GetCount");
future1.await();
if (future1.isCompleted()) {

View file

@ -18,8 +18,8 @@ public class UntypedTransactorExample {
Thread.sleep(3000);
Future future1 = counter1.sendRequestReplyFuture("GetCount");
Future future2 = counter2.sendRequestReplyFuture("GetCount");
Future future1 = counter1.ask("GetCount");
Future future2 = counter2.ask("GetCount");
future1.await();
if (future1.isCompleted()) {

View file

@ -49,7 +49,7 @@ public class UntypedCoordinatedIncrementTest {
incrementLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException exception) {}
for (ActorRef counter : counters) {
Future future = counter.sendRequestReplyFuture("GetCount");
Future future = counter.ask("GetCount");
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();
@ -72,7 +72,7 @@ public class UntypedCoordinatedIncrementTest {
incrementLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException exception) {}
for (ActorRef counter : counters) {
Future future = counter.sendRequestReplyFuture("GetCount");
Future future = counter.ask("GetCount");
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();

View file

@ -48,7 +48,7 @@ public class UntypedTransactorTest {
incrementLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException exception) {}
for (ActorRef counter : counters) {
Future future = counter.sendRequestReplyFuture("GetCount");
Future future = counter.ask("GetCount");
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();
@ -71,7 +71,7 @@ public class UntypedTransactorTest {
incrementLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException exception) {}
for (ActorRef counter : counters) {
Future future = counter.sendRequestReplyFuture("GetCount");
Future future = counter.ask("GetCount");
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();

View file

@ -184,7 +184,7 @@ public class Pi {
// send calculate message
long timeout = 60000;
Future<Double> replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null);
Future<Double> replyFuture = master.ask(new Calculate(), timeout, null);
Option<Double> result = replyFuture.await().resultOrException();
if (result.isDefined()) {
double pi = result.get();

View file

@ -29,7 +29,7 @@ akka {
actor {
timeout = 5 # Default timeout for Future based invocations
# - Actor: !! && !!!
# - UntypedActor: sendRequestReply && sendRequestReplyFuture
# - UntypedActor: sendRequestReply && ask
# - TypedActor: methods with non-void return type
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all Dispatcher, set to 1 for complete fairness