ask-2.0
This commit is contained in:
parent
774584642e
commit
a44da38e2b
13 changed files with 122 additions and 120 deletions
|
|
@ -104,33 +104,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
|
|||
*/
|
||||
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
|
||||
|
||||
/**
|
||||
* Akka Java API.
|
||||
*
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
|
||||
* timeout has expired.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use 'tell' together with the sender
|
||||
* parameter to implement non-blocking request/response message exchanges.
|
||||
*
|
||||
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
|
||||
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
|
||||
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
|
||||
* otherwise the sender will block until the timeout expires.
|
||||
*
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s reference, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*/
|
||||
def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
|
||||
|
||||
/**
|
||||
* Forwards the message and passes the original sender actor as the sender.
|
||||
* <p/>
|
||||
|
|
@ -179,35 +152,6 @@ trait ScalaActorRef { ref: ActorRef ⇒
|
|||
*/
|
||||
def !(message: Any)(implicit sender: ActorRef = null): Unit
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
|
||||
* timeout has expired.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
|
||||
* sender parameter to implement non-blocking request/response message exchanges.
|
||||
*
|
||||
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
|
||||
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
|
||||
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
|
||||
* otherwise the sender will block until the timeout expires.
|
||||
*
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s reference, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*/
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any]
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
|
||||
* implicit timeout
|
||||
*/
|
||||
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -236,6 +180,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
|||
def stop(): Unit
|
||||
def sendSystemMessage(message: SystemMessage): Unit
|
||||
def getParent: InternalActorRef
|
||||
def provider: ActorRefProvider
|
||||
/**
|
||||
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
|
||||
* some provider-specific location. This method shall return the end result,
|
||||
|
|
@ -321,6 +266,8 @@ private[akka] class LocalActorRef private[akka] (
|
|||
|
||||
def getParent: InternalActorRef = actorCell.parent
|
||||
|
||||
def provider = actorCell.provider
|
||||
|
||||
/**
|
||||
* Method for looking up a single child beneath this actor. Override in order
|
||||
* to inject “synthetic” actor paths like “/temp”.
|
||||
|
|
@ -365,17 +312,6 @@ private[akka] class LocalActorRef private[akka] (
|
|||
|
||||
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
|
||||
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
actorCell.provider.ask(timeout) match {
|
||||
case Some(a) ⇒
|
||||
this.!(message)(a)
|
||||
a.result
|
||||
case None ⇒
|
||||
this.!(message)(null)
|
||||
Promise[Any]()(actorCell.system.dispatcher)
|
||||
}
|
||||
}
|
||||
|
||||
def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
|
|
@ -405,6 +341,8 @@ case class SerializedActorRef(path: String) {
|
|||
trait MinimalActorRef extends InternalActorRef with LocalRef {
|
||||
|
||||
def getParent: InternalActorRef = Nobody
|
||||
def provider: ActorRefProvider =
|
||||
throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
|
||||
def getChild(names: Iterator[String]): InternalActorRef = {
|
||||
val dropped = names.dropWhile(_.isEmpty)
|
||||
if (dropped.isEmpty) this
|
||||
|
|
@ -420,9 +358,6 @@ trait MinimalActorRef extends InternalActorRef with LocalRef {
|
|||
|
||||
def !(message: Any)(implicit sender: ActorRef = null): Unit = ()
|
||||
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
|
||||
throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
|
||||
|
||||
def sendSystemMessage(message: SystemMessage): Unit = ()
|
||||
def restart(cause: Throwable): Unit = ()
|
||||
|
||||
|
|
@ -471,13 +406,6 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
|
|||
case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
|
||||
}
|
||||
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
eventStream.publish(DeadLetter(message, this, this))
|
||||
// leave this in: guard with good visibility against really stupid/weird errors
|
||||
assert(brokenPromise != null)
|
||||
brokenPromise
|
||||
}
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
|
||||
}
|
||||
|
|
@ -558,9 +486,6 @@ class AskActorRef(
|
|||
case _ ⇒
|
||||
}
|
||||
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
|
||||
Promise.failed(new UnsupportedOperationException("Ask/? is not supported for [%s]".format(getClass.getName)))(dispatcher)
|
||||
|
||||
override def isTerminated = result.isCompleted
|
||||
|
||||
override def stop(): Unit = if (running.getAndSet(false)) {
|
||||
|
|
|
|||
|
|
@ -56,6 +56,8 @@ trait ActorRefProvider {
|
|||
|
||||
def settings: ActorSystem.Settings
|
||||
|
||||
def dispatcher: MessageDispatcher
|
||||
|
||||
/**
|
||||
* Initialization of an ActorRefProvider happens in two steps: first
|
||||
* construction of the object with settings, eventStream, scheduler, etc.
|
||||
|
|
|
|||
|
|
@ -8,6 +8,9 @@ package object actor {
|
|||
implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = ref.asInstanceOf[ScalaActorRef]
|
||||
implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = ref.asInstanceOf[ActorRef]
|
||||
|
||||
implicit def actorRef2Askable(actorRef: ActorRef) = new dispatch.AskableActorRef(actorRef)
|
||||
implicit def askable2ActorRef(askable: dispatch.AskableActorRef) = askable.actorRef
|
||||
|
||||
type Uuid = com.eaio.uuid.UUID
|
||||
|
||||
def newUuid(): Uuid = new Uuid()
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger,
|
|||
import akka.dispatch.Await.CanAwait
|
||||
import java.util.concurrent._
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.{ ActorRef, InternalActorRef }
|
||||
|
||||
object Await {
|
||||
sealed trait CanAwait
|
||||
|
|
@ -53,6 +54,24 @@ object Await {
|
|||
*/
|
||||
object Futures {
|
||||
|
||||
def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
val provider = actor.asInstanceOf[InternalActorRef].provider
|
||||
provider.ask(timeout) match {
|
||||
case Some(a) ⇒
|
||||
actor.!(message)(a)
|
||||
a.result
|
||||
case None ⇒
|
||||
actor.!(message)(null)
|
||||
Promise[Any]()(provider.dispatcher)
|
||||
}
|
||||
}
|
||||
|
||||
def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] =
|
||||
ask(actor, message)(timeout)
|
||||
|
||||
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] =
|
||||
ask(actor, message)(new Timeout(timeoutMillis))
|
||||
|
||||
/**
|
||||
* Java API, equivalent to Future.apply
|
||||
*/
|
||||
|
|
@ -134,6 +153,66 @@ object Futures {
|
|||
}
|
||||
}
|
||||
|
||||
final class AskableActorRef(val actorRef: ActorRef) {
|
||||
|
||||
/**
|
||||
* Akka Java API.
|
||||
*
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
|
||||
* timeout has expired.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use 'tell' together with the sender
|
||||
* parameter to implement non-blocking request/response message exchanges.
|
||||
*
|
||||
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
|
||||
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
|
||||
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
|
||||
* otherwise the sender will block until the timeout expires.
|
||||
*
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s reference, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*/
|
||||
def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
|
||||
* timeout has expired.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
|
||||
* sender parameter to implement non-blocking request/response message exchanges.
|
||||
*
|
||||
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
|
||||
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
|
||||
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
|
||||
* otherwise the sender will block until the timeout expires.
|
||||
*
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s reference, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*/
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = Futures.ask(actorRef, message)
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
|
||||
* implicit timeout
|
||||
*/
|
||||
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
|
||||
}
|
||||
|
||||
object Future {
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import akka.actor.Props;
|
|||
import akka.actor.Terminated;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.dispatch.Await;
|
||||
import static akka.dispatch.Futures.ask;
|
||||
import akka.util.Duration;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import akka.testkit.TestProbe;
|
||||
|
|
@ -126,19 +127,19 @@ public class FaultHandlingTestBase {
|
|||
//#create
|
||||
Props superprops = new Props(Supervisor.class).withFaultHandler(strategy);
|
||||
ActorRef supervisor = system.actorOf(superprops, "supervisor");
|
||||
ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
ActorRef child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
|
||||
//#create
|
||||
|
||||
//#resume
|
||||
child.tell(42);
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(42);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
|
||||
child.tell(new ArithmeticException());
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(42);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
|
||||
//#resume
|
||||
|
||||
//#restart
|
||||
child.tell(new NullPointerException());
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
|
||||
//#restart
|
||||
|
||||
//#stop
|
||||
|
|
@ -149,9 +150,9 @@ public class FaultHandlingTestBase {
|
|||
//#stop
|
||||
|
||||
//#escalate-kill
|
||||
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
|
||||
probe.watch(child);
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
|
||||
child.tell(new Exception());
|
||||
probe.expectMsg(new Terminated(child));
|
||||
//#escalate-kill
|
||||
|
|
@ -159,11 +160,11 @@ public class FaultHandlingTestBase {
|
|||
//#escalate-restart
|
||||
superprops = new Props(Supervisor2.class).withFaultHandler(strategy);
|
||||
supervisor = system.actorOf(superprops, "supervisor2");
|
||||
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
|
||||
child.tell(23);
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(23);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
|
||||
child.tell(new Exception());
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
|
||||
//#escalate-restart
|
||||
//#testkit
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import akka.actor.Props;
|
|||
|
||||
//#import-future
|
||||
import akka.dispatch.Future;
|
||||
import akka.dispatch.Futures;
|
||||
import akka.dispatch.Await;
|
||||
import akka.util.Duration;
|
||||
import akka.util.Timeout;
|
||||
|
|
@ -117,7 +118,7 @@ public class UntypedActorDocTestBase {
|
|||
}), "myactor");
|
||||
|
||||
//#using-ask
|
||||
Future<Object> future = myActor.ask("Hello", 1000);
|
||||
Future<Object> future = Futures.ask(myActor, "Hello", 1000);
|
||||
Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
|
||||
//#using-ask
|
||||
system.shutdown();
|
||||
|
|
|
|||
|
|
@ -53,7 +53,6 @@ import akka.actor.Status.Failure;
|
|||
import akka.actor.ActorSystem;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.docs.actor.MyUntypedActor;
|
||||
import akka.actor.Props;
|
||||
import akka.dispatch.Futures;
|
||||
|
||||
|
|
@ -79,7 +78,7 @@ public class FutureDocTestBase {
|
|||
String msg = "hello";
|
||||
//#ask-blocking
|
||||
Timeout timeout = system.settings().ActorTimeout();
|
||||
Future<Object> future = actor.ask(msg, timeout);
|
||||
Future<Object> future = Futures.ask(actor, msg, timeout);
|
||||
String result = (String) Await.result(future, timeout.duration());
|
||||
//#ask-blocking
|
||||
assertEquals("HELLO", result);
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import akka.actor.Actor
|
|||
import akka.actor.Props
|
||||
import akka.event.Logging
|
||||
import akka.dispatch.Future
|
||||
import akka.dispatch.Futures
|
||||
|
||||
//#imports1
|
||||
|
||||
|
|
@ -229,10 +230,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
|
||||
val myActor = system.actorOf(Props(new MyActor), name = "myactor")
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = myActor ? "hello"
|
||||
val future = Futures.ask(myActor, "hello")
|
||||
for (x ← future) println(x) //Prints "hello"
|
||||
|
||||
val result: Future[Int] = for (x ← (myActor ? 3).mapTo[Int]) yield { 2 * x }
|
||||
val result: Future[Int] = for (x ← Futures.ask(myActor, 3).mapTo[Int]) yield { 2 * x }
|
||||
//#using-ask
|
||||
|
||||
system.stop(myActor)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.Actor
|
|||
import akka.actor.Props
|
||||
import akka.actor.Status.Failure
|
||||
import akka.dispatch.Future
|
||||
import akka.dispatch.Futures
|
||||
import akka.dispatch.Await
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.Promise
|
||||
|
|
@ -46,7 +47,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
import akka.dispatch.Await
|
||||
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = actor ? msg
|
||||
val future = Futures.ask(actor, msg)
|
||||
val result = Await.result(future, timeout.duration).asInstanceOf[String]
|
||||
//#ask-blocking
|
||||
result must be("HELLO")
|
||||
|
|
@ -59,7 +60,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
//#map-to
|
||||
import akka.dispatch.Future
|
||||
|
||||
val future: Future[String] = (actor ? msg).mapTo[String]
|
||||
val future: Future[String] = Futures.ask(actor, msg).mapTo[String]
|
||||
//#map-to
|
||||
Await.result(future, timeout.duration) must be("HELLO")
|
||||
}
|
||||
|
|
@ -149,13 +150,13 @@ class FutureDocSpec extends AkkaSpec {
|
|||
import akka.dispatch.Await
|
||||
//#composing-wrong
|
||||
|
||||
val f1 = actor1 ? msg1
|
||||
val f2 = actor2 ? msg2
|
||||
val f1 = Futures.ask(actor1, msg1)
|
||||
val f2 = Futures.ask(actor2, msg2)
|
||||
|
||||
val a = Await.result(f1, 1 second).asInstanceOf[Int]
|
||||
val b = Await.result(f2, 1 second).asInstanceOf[Int]
|
||||
|
||||
val f3 = actor3 ? (a + b)
|
||||
val f3 = Futures.ask(actor3, (a + b))
|
||||
|
||||
val result = Await.result(f3, 1 second).asInstanceOf[Int]
|
||||
//#composing-wrong
|
||||
|
|
@ -172,13 +173,13 @@ class FutureDocSpec extends AkkaSpec {
|
|||
import akka.dispatch.Await
|
||||
//#composing
|
||||
|
||||
val f1 = actor1 ? msg1
|
||||
val f2 = actor2 ? msg2
|
||||
val f1 = Futures.ask(actor1, msg1)
|
||||
val f2 = Futures.ask(actor2, msg2)
|
||||
|
||||
val f3 = for {
|
||||
a ← f1.mapTo[Int]
|
||||
b ← f2.mapTo[Int]
|
||||
c ← (actor3 ? (a + b)).mapTo[Int]
|
||||
c ← Futures.ask(actor3, (a + b)).mapTo[Int]
|
||||
} yield c
|
||||
|
||||
val result = Await.result(f3, 1 second).asInstanceOf[Int]
|
||||
|
|
@ -191,7 +192,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val oddActor = system.actorOf(Props[OddActor])
|
||||
//#sequence-ask
|
||||
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
|
||||
val listOfFutures = List.fill(100)((oddActor ? GetNext).mapTo[Int])
|
||||
val listOfFutures = List.fill(100)(Futures.ask(oddActor, GetNext).mapTo[Int])
|
||||
|
||||
// now we have a Future[List[Int]]
|
||||
val futureList = Future.sequence(listOfFutures)
|
||||
|
|
@ -239,7 +240,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val actor = system.actorOf(Props[MyActor])
|
||||
val msg1 = -1
|
||||
//#recover
|
||||
val future = actor ? msg1 recover {
|
||||
val future = Futures.ask(actor, msg1) recover {
|
||||
case e: ArithmeticException ⇒ 0
|
||||
}
|
||||
//#recover
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import akka.routing.{ ScatterGatherFirstCompletedRouter, BroadcastRouter, Random
|
|||
import annotation.tailrec
|
||||
import akka.actor.{ Props, Actor }
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.Await
|
||||
import akka.dispatch.{ Futures, Await }
|
||||
|
||||
case class FibonacciNumber(nbr: Int)
|
||||
|
||||
|
|
@ -71,7 +71,7 @@ class ParentActor extends Actor {
|
|||
Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter(within = 2 seconds)),
|
||||
"router")
|
||||
implicit val timeout = context.system.settings.ActorTimeout
|
||||
val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10)
|
||||
val futureResult = Futures.ask(scatterGatherFirstCompletedRouter, FibonacciNumber(10))
|
||||
val result = Await.result(futureResult, timeout.duration)
|
||||
//#scatterGatherFirstCompletedRouter
|
||||
println("The result of calculating Fibonacci for 10 is %d".format(result))
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.docs.testkit
|
|||
import akka.testkit.TestProbe
|
||||
import akka.util.duration._
|
||||
import akka.actor._
|
||||
import akka.dispatch.Futures
|
||||
|
||||
//#imports-test-probe
|
||||
|
||||
|
|
@ -204,7 +205,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
import akka.util.duration._
|
||||
//#test-probe-reply
|
||||
val probe = TestProbe()
|
||||
val future = probe.ref ? "hello"
|
||||
val future = Futures.ask(probe.ref, "hello")
|
||||
probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
|
||||
probe.sender ! "world"
|
||||
assert(future.isCompleted && future.value == Some(Right("world")))
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ trait RemoteRef extends ActorRefScope {
|
|||
* This reference is network-aware (remembers its origin) and immutable.
|
||||
*/
|
||||
private[akka] class RemoteActorRef private[akka] (
|
||||
provider: RemoteActorRefProvider,
|
||||
override val provider: RemoteActorRefProvider,
|
||||
remote: RemoteSupport[ParsedTransportAddress],
|
||||
val path: ActorPath,
|
||||
val getParent: InternalActorRef,
|
||||
|
|
@ -185,17 +185,6 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
|
||||
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
provider.ask(timeout) match {
|
||||
case Some(a) ⇒
|
||||
this.!(message)(a)
|
||||
a.result
|
||||
case None ⇒
|
||||
this.!(message)(null)
|
||||
Promise[Any]()(provider.dispatcher)
|
||||
}
|
||||
}
|
||||
|
||||
def suspend(): Unit = sendSystemMessage(Suspend())
|
||||
|
||||
def resume(): Unit = sendSystemMessage(Resume())
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ class TestActorRef[T <: Actor](
|
|||
underlying.actor.asInstanceOf[T] match {
|
||||
case null ⇒
|
||||
val t = underlying.system.settings.ActorTimeout
|
||||
Await.result(?(InternalGetActor)(t), t.duration).asInstanceOf[T]
|
||||
Await.result(this.?(InternalGetActor)(t), t.duration).asInstanceOf[T]
|
||||
case ref ⇒ ref
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue