diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index f1378db41a..c4009fdbb4 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -104,33 +104,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
- /**
- * Akka Java API.
- *
- * Sends a message asynchronously returns a future holding the eventual reply message.
- * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
- * timeout has expired.
- *
- * NOTE:
- * Use this method with care. In most cases it is better to use 'tell' together with the sender
- * parameter to implement non-blocking request/response message exchanges.
- *
- * If you are sending messages using ask and using blocking operations on the Future, such as
- * 'get', then you have to use getContext().sender().tell(...)
- * in the target actor to send a reply message to the original sender, and thereby completing the Future,
- * otherwise the sender will block until the timeout expires.
- *
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s reference, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- */
- def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
-
- def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
-
/**
* Forwards the message and passes the original sender actor as the sender.
*
ask and using blocking operations on the Future, such as
- * 'get', then you have to use getContext().sender().tell(...)
- * in the target actor to send a reply message to the original sender, and thereby completing the Future,
- * otherwise the sender will block until the timeout expires.
- *
- * When using future callbacks, inside actors you need to carefully avoid closing over
- * the containing actor’s reference, i.e. do not call methods or access mutable state
- * on the enclosing actor from within the callback. This would break the actor
- * encapsulation and may introduce synchronization bugs and race conditions because
- * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
- * there is not yet a way to detect these illegal accesses at compile time.
- */
- def ?(message: Any)(implicit timeout: Timeout): Future[Any]
-
- /**
- * Sends a message asynchronously, returning a future which may eventually hold the reply.
- * The implicit parameter with the default value is just there to disambiguate it from the version that takes the
- * implicit timeout
- */
- def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
/**
@@ -236,6 +180,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
def stop(): Unit
def sendSystemMessage(message: SystemMessage): Unit
def getParent: InternalActorRef
+ def provider: ActorRefProvider
/**
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
* some provider-specific location. This method shall return the end result,
@@ -321,6 +266,8 @@ private[akka] class LocalActorRef private[akka] (
def getParent: InternalActorRef = actorCell.parent
+ def provider = actorCell.provider
+
/**
* Method for looking up a single child beneath this actor. Override in order
* to inject “synthetic” actor paths like “/temp”.
@@ -365,17 +312,6 @@ private[akka] class LocalActorRef private[akka] (
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
- def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
- actorCell.provider.ask(timeout) match {
- case Some(a) ⇒
- this.!(message)(a)
- a.result
- case None ⇒
- this.!(message)(null)
- Promise[Any]()(actorCell.system.dispatcher)
- }
- }
-
def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
@@ -405,6 +341,8 @@ case class SerializedActorRef(path: String) {
trait MinimalActorRef extends InternalActorRef with LocalRef {
def getParent: InternalActorRef = Nobody
+ def provider: ActorRefProvider =
+ throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
def getChild(names: Iterator[String]): InternalActorRef = {
val dropped = names.dropWhile(_.isEmpty)
if (dropped.isEmpty) this
@@ -420,9 +358,6 @@ trait MinimalActorRef extends InternalActorRef with LocalRef {
def !(message: Any)(implicit sender: ActorRef = null): Unit = ()
- def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
- throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
-
def sendSystemMessage(message: SystemMessage): Unit = ()
def restart(cause: Throwable): Unit = ()
@@ -471,13 +406,6 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
}
- override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
- eventStream.publish(DeadLetter(message, this, this))
- // leave this in: guard with good visibility against really stupid/weird errors
- assert(brokenPromise != null)
- brokenPromise
- }
-
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
@@ -558,9 +486,6 @@ class AskActorRef(
case _ ⇒
}
- override def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
- Promise.failed(new UnsupportedOperationException("Ask/? is not supported for [%s]".format(getClass.getName)))(dispatcher)
-
override def isTerminated = result.isCompleted
override def stop(): Unit = if (running.getAndSet(false)) {
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index d940aa2c20..861a234db3 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -56,6 +56,8 @@ trait ActorRefProvider {
def settings: ActorSystem.Settings
+ def dispatcher: MessageDispatcher
+
/**
* Initialization of an ActorRefProvider happens in two steps: first
* construction of the object with settings, eventStream, scheduler, etc.
diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala
index cfe5bc1b0d..0b06470c79 100644
--- a/akka-actor/src/main/scala/akka/actor/package.scala
+++ b/akka-actor/src/main/scala/akka/actor/package.scala
@@ -8,6 +8,9 @@ package object actor {
implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = ref.asInstanceOf[ScalaActorRef]
implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = ref.asInstanceOf[ActorRef]
+ implicit def actorRef2Askable(actorRef: ActorRef) = new dispatch.AskableActorRef(actorRef)
+ implicit def askable2ActorRef(askable: dispatch.AskableActorRef) = askable.actorRef
+
type Uuid = com.eaio.uuid.UUID
def newUuid(): Uuid = new Uuid()
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index fea97fbaf3..c70ec32c43 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger,
import akka.dispatch.Await.CanAwait
import java.util.concurrent._
import akka.actor.ActorSystem
+import akka.actor.{ ActorRef, InternalActorRef }
object Await {
sealed trait CanAwait
@@ -53,6 +54,24 @@ object Await {
*/
object Futures {
+ def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = {
+ val provider = actor.asInstanceOf[InternalActorRef].provider
+ provider.ask(timeout) match {
+ case Some(a) ⇒
+ actor.!(message)(a)
+ a.result
+ case None ⇒
+ actor.!(message)(null)
+ Promise[Any]()(provider.dispatcher)
+ }
+ }
+
+ def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] =
+ ask(actor, message)(timeout)
+
+ def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] =
+ ask(actor, message)(new Timeout(timeoutMillis))
+
/**
* Java API, equivalent to Future.apply
*/
@@ -134,6 +153,66 @@ object Futures {
}
}
+final class AskableActorRef(val actorRef: ActorRef) {
+
+ /**
+ * Akka Java API.
+ *
+ * Sends a message asynchronously returns a future holding the eventual reply message.
+ * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
+ * timeout has expired.
+ *
+ * NOTE:
+ * Use this method with care. In most cases it is better to use 'tell' together with the sender
+ * parameter to implement non-blocking request/response message exchanges.
+ *
+ * If you are sending messages using ask and using blocking operations on the Future, such as
+ * 'get', then you have to use getContext().sender().tell(...)
+ * in the target actor to send a reply message to the original sender, and thereby completing the Future,
+ * otherwise the sender will block until the timeout expires.
+ *
+ * When using future callbacks, inside actors you need to carefully avoid closing over
+ * the containing actor’s reference, i.e. do not call methods or access mutable state
+ * on the enclosing actor from within the callback. This would break the actor
+ * encapsulation and may introduce synchronization bugs and race conditions because
+ * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
+ * there is not yet a way to detect these illegal accesses at compile time.
+ */
+ def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
+
+ def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
+
+ /**
+ * Sends a message asynchronously, returning a future which may eventually hold the reply.
+ * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
+ * timeout has expired.
+ *
+ * NOTE:
+ * Use this method with care. In most cases it is better to use '!' together with implicit or explicit
+ * sender parameter to implement non-blocking request/response message exchanges.
+ *
+ * If you are sending messages using ask and using blocking operations on the Future, such as
+ * 'get', then you have to use getContext().sender().tell(...)
+ * in the target actor to send a reply message to the original sender, and thereby completing the Future,
+ * otherwise the sender will block until the timeout expires.
+ *
+ * When using future callbacks, inside actors you need to carefully avoid closing over
+ * the containing actor’s reference, i.e. do not call methods or access mutable state
+ * on the enclosing actor from within the callback. This would break the actor
+ * encapsulation and may introduce synchronization bugs and race conditions because
+ * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
+ * there is not yet a way to detect these illegal accesses at compile time.
+ */
+ def ?(message: Any)(implicit timeout: Timeout): Future[Any] = Futures.ask(actorRef, message)
+
+ /**
+ * Sends a message asynchronously, returning a future which may eventually hold the reply.
+ * The implicit parameter with the default value is just there to disambiguate it from the version that takes the
+ * implicit timeout
+ */
+ def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
+}
+
object Future {
/**
diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java
index 132dc990ee..1bc3b40c4b 100644
--- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java
+++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java
@@ -13,6 +13,7 @@ import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.dispatch.Await;
+import static akka.dispatch.Futures.ask;
import akka.util.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;
@@ -126,19 +127,19 @@ public class FaultHandlingTestBase {
//#create
Props superprops = new Props(Supervisor.class).withFaultHandler(strategy);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
- ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
+ ActorRef child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
//#create
//#resume
child.tell(42);
- assert Await.result(child.ask("get", 5000), timeout).equals(42);
+ assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException());
- assert Await.result(child.ask("get", 5000), timeout).equals(42);
+ assert Await.result(ask(child, "get", 5000), timeout).equals(42);
//#resume
//#restart
child.tell(new NullPointerException());
- assert Await.result(child.ask("get", 5000), timeout).equals(0);
+ assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#restart
//#stop
@@ -149,9 +150,9 @@ public class FaultHandlingTestBase {
//#stop
//#escalate-kill
- child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
+ child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
probe.watch(child);
- assert Await.result(child.ask("get", 5000), timeout).equals(0);
+ assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception());
probe.expectMsg(new Terminated(child));
//#escalate-kill
@@ -159,11 +160,11 @@ public class FaultHandlingTestBase {
//#escalate-restart
superprops = new Props(Supervisor2.class).withFaultHandler(strategy);
supervisor = system.actorOf(superprops, "supervisor2");
- child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
+ child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
child.tell(23);
- assert Await.result(child.ask("get", 5000), timeout).equals(23);
+ assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception());
- assert Await.result(child.ask("get", 5000), timeout).equals(0);
+ assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#escalate-restart
//#testkit
}
diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java
index d442ae6461..be164aa850 100644
--- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java
+++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java
@@ -11,6 +11,7 @@ import akka.actor.Props;
//#import-future
import akka.dispatch.Future;
+import akka.dispatch.Futures;
import akka.dispatch.Await;
import akka.util.Duration;
import akka.util.Timeout;
@@ -117,7 +118,7 @@ public class UntypedActorDocTestBase {
}), "myactor");
//#using-ask
- Future