diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index 10df9d2218..a019b4afb3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -47,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec { "forward actor reference when invoking forward on ask" in { val chain = createForwardingChain(system) - chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } + chain.ask(ExpectedMessage)(5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } expectMsg(5 seconds, ExpectedMessage) } } diff --git a/akka-actor/src/main/scala/akka/Patterns.scala b/akka-actor/src/main/scala/akka/Patterns.scala new file mode 100644 index 0000000000..7650c4077f --- /dev/null +++ b/akka-actor/src/main/scala/akka/Patterns.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka + +object Patterns { + import akka.actor.ActorRef + import akka.dispatch.Future + import akka.patterns.{ ask ⇒ scalaAsk } + import akka.util.Timeout + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(worker, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]] + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(worker, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index b689aa5370..e7c03e8ae0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -167,6 +167,13 @@ trait LocalRef extends ActorRefScope { final def isLocal = true } +/** + * Trait for matching on ActorRefs which have access to a provider; this is used in akka.patterns.ask. + */ +trait ActorRefWithProvider { this: InternalActorRef ⇒ + def provider: ActorRefProvider +} + /** * Internal trait for assembling all the functionality needed internally on * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE! @@ -180,7 +187,6 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def stop(): Unit def sendSystemMessage(message: SystemMessage): Unit def getParent: InternalActorRef - def provider: ActorRefProvider /** * Obtain ActorRef by possibly traversing the actor tree or looking it up at * some provider-specific location. This method shall return the end result, @@ -212,7 +218,7 @@ private[akka] class LocalActorRef private[akka] ( val systemService: Boolean = false, _receiveTimeout: Option[Duration] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) - extends InternalActorRef with LocalRef { + extends InternalActorRef with LocalRef with ActorRefWithProvider { /* * actorCell.start() publishes actorCell & this to the dispatcher, which @@ -341,8 +347,7 @@ case class SerializedActorRef(path: String) { trait MinimalActorRef extends InternalActorRef with LocalRef { def getParent: InternalActorRef = Nobody - def provider: ActorRefProvider = - throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName)) + def getChild(names: Iterator[String]): InternalActorRef = { val dropped = names.dropWhile(_.isEmpty) if (dropped.isEmpty) this @@ -466,10 +471,14 @@ class AskTimeoutException(message: String, cause: Throwable) extends TimeoutExce def this(message: String) = this(message, null: Throwable) } +/** + * Akka private optimized representation of the temporary actor spawned to + * receive the reply to an "ask" operation. + */ private[akka] final class PromiseActorRef( val path: ActorPath, override val getParent: InternalActorRef, - private final val result: Promise[Any], + val result: Promise[Any], val deathWatch: DeathWatch) extends MinimalActorRef { final val running = new AtomicBoolean(true) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 6af8c36198..4408f7562e 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -104,7 +104,7 @@ trait ActorRefProvider { * Create AskActorRef and register it properly so it can be serialized/deserialized; * caller needs to send the message. */ - def ask(result: Promise[Any], within: Timeout): Option[ActorRef] + def ask(within: Timeout): Option[PromiseActorRef] /** * This Future is completed upon termination of this ActorRefProvider, which @@ -494,12 +494,13 @@ class LocalActorRefProvider( } } - def ask(result: Promise[Any], within: Timeout): Option[ActorRef] = { + def ask(within: Timeout): Option[PromiseActorRef] = { (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ None case t ⇒ val path = tempPath() val name = path.name + val result = Promise[Any]()(dispatcher) val a = new PromiseActorRef(path, tempContainer, result, deathWatch) tempContainer.addChild(name, a) val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } @@ -509,24 +510,6 @@ class LocalActorRefProvider( } Some(a) - - // Alternative implementation: - // Create a full-blown actor to complete the promise. - // This would also work but not as efficient as PromiseActorRef. - //val b = actorOf(system, Props(new Actor { - // def receive = { - // case Status.Success(r) ⇒ result.success(r) - // case Status.Failure(f) ⇒ result.failure(f) - // case other ⇒ result.success(other) - // } - //}), systemGuardian, systemGuardian.path / "promise" / tempName(), false, None) - //val ff = system.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } - //result onComplete { _ ⇒ - // b.stop() - // ff.cancel() - //} - // - //Some(b) } } } diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index acfbb19e5d..cfe5bc1b0d 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -39,126 +39,3 @@ package object actor { } } - -package object patterns { - - import akka.actor.{ ActorRef, InternalActorRef } - import akka.dispatch.Promise - import akka.util.Timeout - - implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef()(actorRef) - - // Implicit for converting a Promise to an ActorRef. - // Symmetric to the future2actor conversion, which allows - // piping a Future result (read side) to an Actor's mailbox, this - // conversion allows using an Actor to complete a Promise (write side) - // - // Future.ask / actor ? message is now a trivial implementation that can - // also be done in user code (assuming actorRef, timeout and dispatcher implicits): - // - // Patterns.ask(actor, message) = { - // val promise = Promise[Any]() - // actor ! (message, promise) - // promise - // } - - @inline implicit def promise2actorRef(promise: Promise[Any])(implicit actorRef: ActorRef, timeout: Timeout): ActorRef = { - val provider = actorRef.asInstanceOf[InternalActorRef].provider - provider.ask(promise, timeout) match { - case Some(ref) ⇒ ref - case None ⇒ null - } - } - -} - -package patterns { - - import akka.actor.{ ActorRef, InternalActorRef } - import akka.dispatch.{ Future, Promise } - import akka.util.Timeout - - final class AskableActorRef(implicit val actorRef: ActorRef) { - - /** - * Akka Java API. - * - * Sends a message asynchronously returns a future holding the eventual reply message. - * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given - * timeout has expired. - * - * NOTE: - * Use this method with care. In most cases it is better to use 'tell' together with the sender - * parameter to implement non-blocking request/response message exchanges. - * - * If you are sending messages using ask and using blocking operations on the Future, such as - * 'get', then you have to use getContext().sender().tell(...) - * in the target actor to send a reply message to the original sender, and thereby completing the Future, - * otherwise the sender will block until the timeout expires. - * - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s reference, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - */ - def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]] - - def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis)) - - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given - * timeout has expired. - * - * NOTE: - * Use this method with care. In most cases it is better to use '!' together with implicit or explicit - * sender parameter to implement non-blocking request/response message exchanges. - * - * If you are sending messages using ask and using blocking operations on the Future, such as - * 'get', then you have to use getContext().sender().tell(...) - * in the target actor to send a reply message to the original sender, and thereby completing the Future, - * otherwise the sender will block until the timeout expires. - * - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s reference, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - */ - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - implicit val dispatcher = actorRef.asInstanceOf[InternalActorRef].provider.dispatcher - val promise = Promise[Any]() - actorRef.!(message)(promise) - promise - } - - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The implicit parameter with the default value is just there to disambiguate it from the version that takes the - * implicit timeout - */ - def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) - } - -} - -object Patterns { - - import akka.actor.ActorRef - import akka.dispatch.Future - import akka.patterns.{ ask ⇒ actorRef2Askable } - import akka.util.Timeout - - def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = - actorRef2Askable(actor).?(message) - - def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = - actorRef2Askable(actor).?(message)(timeout) - - def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] = - actorRef2Askable(actor).?(message)(new Timeout(timeoutMillis)) - -} diff --git a/akka-actor/src/main/scala/akka/patterns/AskableActorRef.scala b/akka-actor/src/main/scala/akka/patterns/AskableActorRef.scala new file mode 100644 index 0000000000..1b42fc664c --- /dev/null +++ b/akka-actor/src/main/scala/akka/patterns/AskableActorRef.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.patterns + +import akka.actor.{ ActorRef, InternalActorRef, AskTimeoutException } +import akka.dispatch.{ Future, Promise } +import akka.util.Timeout + +final class AskableActorRef(val actorRef: ActorRef) { + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = worker.ask(request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.patterns.ask(actorRef, message)(timeout) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = worker ? request + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.patterns.ask(actorRef, message) + + /* + * FIXME: I think this should be removed, since it introduces an “ambiguity” + * when sending Tuple2, which the compiler resolves unexpectedly to this + * method; also overloading is bad, isn’t it? - RK (ticket #1653) + */ + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + * The implicit parameter with the default value is just there to disambiguate it from the version that takes the + * implicit timeout + */ + def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) + +} diff --git a/akka-actor/src/main/scala/akka/patterns/package.scala b/akka-actor/src/main/scala/akka/patterns/package.scala new file mode 100644 index 0000000000..d0cd07766a --- /dev/null +++ b/akka-actor/src/main/scala/akka/patterns/package.scala @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka + +package object patterns { + + import akka.actor.{ ActorRef, InternalActorRef, ActorRefWithProvider, AskTimeoutException } + import akka.dispatch.{ Future, Promise } + import akka.util.Timeout + + /** + * Import this implicit conversion to gain `?` and `ask` methods on + * [[akka.actor.ActorRef]], which will defer to the + * `ask(actorRef, message)(timeout)` method defined here. + * + * {{{ + * import akka.patterns.ask + * + * val future = actor ? message // => ask(actor, message) + * val future = actor ask message // => ask(actor, message) + * val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout) + * }}} + * + * All of the above use an implicit [[akka.actor.Timeout]]. + */ + implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { + case ref: ActorRefWithProvider ⇒ + ref.provider.ask(timeout) match { + case Some(ref) ⇒ + actorRef.tell(message, ref) + ref.result + case None ⇒ + actorRef.tell(message) + Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(ref.provider.dispatcher) + } + case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) + } + +} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 36f5fe9670..11f8fee4af 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -351,10 +351,7 @@ trait BroadcastLike { this: RouterConfig ⇒ createAndRegisterRoutees(props, context, nrOfInstances, routees) { - case (sender, message) ⇒ - message match { - case _ ⇒ toAll(sender, ref.routees) - } + case (sender, message) ⇒ toAll(sender, ref.routees) } } } @@ -407,12 +404,9 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ val provider: ActorRefProvider = context.asInstanceOf[ActorCell].systemImpl.provider - val promise = Promise[Any]()(provider.dispatcher) - val asker = provider.ask(promise, Timeout(within)).get - promise.pipeTo(sender) - message match { - case _ ⇒ toAll(asker, ref.routees) - } + val asker = provider.ask(Timeout(within)).get + asker.result.pipeTo(sender) + toAll(asker, ref.routees) } } } diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index f804e29bac..fac3411cde 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -8,7 +8,6 @@ import akka.actor.Actor import akka.actor.Props import akka.event.Logging import akka.dispatch.Future -import akka.Patterns //#imports1 @@ -221,6 +220,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "using ask" in { //#using-ask + import akka.patterns.ask + class MyActor extends Actor { def receive = { case x: String ⇒ sender ! x.toUpperCase @@ -230,10 +231,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val myActor = system.actorOf(Props(new MyActor), name = "myactor") implicit val timeout = system.settings.ActorTimeout - val future = Patterns.ask(myActor, "hello") + val future = ask(myActor, "hello") for (x ← future) println(x) //Prints "hello" - val result: Future[Int] = for (x ← Patterns.ask(myActor, 3).mapTo[Int]) yield { 2 * x } + val result: Future[Int] = for (x ← ask(myActor, 3).mapTo[Int]) yield { 2 * x } //#using-ask system.stop(myActor) diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index f65f8224db..5c86a0c295 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -13,7 +13,6 @@ import akka.dispatch.Future import akka.dispatch.Await import akka.util.duration._ import akka.dispatch.Promise -import akka.Patterns object FutureDocSpec { @@ -45,9 +44,10 @@ class FutureDocSpec extends AkkaSpec { val msg = "hello" //#ask-blocking import akka.dispatch.Await + import akka.patterns.ask implicit val timeout = system.settings.ActorTimeout - val future = Patterns.ask(actor, msg) + val future = actor ? msg // enabled by the “ask” import val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking result must be("HELLO") @@ -59,8 +59,9 @@ class FutureDocSpec extends AkkaSpec { implicit val timeout = system.settings.ActorTimeout //#map-to import akka.dispatch.Future + import akka.patterns.ask - val future: Future[String] = Patterns.ask(actor, msg).mapTo[String] + val future: Future[String] = ask(actor, msg).mapTo[String] //#map-to Await.result(future, timeout.duration) must be("HELLO") } @@ -148,15 +149,16 @@ class FutureDocSpec extends AkkaSpec { val msg2 = 2 implicit val timeout = system.settings.ActorTimeout import akka.dispatch.Await + import akka.patterns.ask //#composing-wrong - val f1 = Patterns.ask(actor1, msg1) - val f2 = Patterns.ask(actor2, msg2) + val f1 = ask(actor1, msg1) + val f2 = ask(actor2, msg2) val a = Await.result(f1, 1 second).asInstanceOf[Int] val b = Await.result(f2, 1 second).asInstanceOf[Int] - val f3 = Patterns.ask(actor3, (a + b)) + val f3 = ask(actor3, (a + b)) val result = Await.result(f3, 1 second).asInstanceOf[Int] //#composing-wrong @@ -171,15 +173,16 @@ class FutureDocSpec extends AkkaSpec { val msg2 = 2 implicit val timeout = system.settings.ActorTimeout import akka.dispatch.Await + import akka.patterns.ask //#composing - val f1 = Patterns.ask(actor1, msg1) - val f2 = Patterns.ask(actor2, msg2) + val f1 = ask(actor1, msg1) + val f2 = ask(actor2, msg2) val f3 = for { a ← f1.mapTo[Int] b ← f2.mapTo[Int] - c ← Patterns.ask(actor3, (a + b)).mapTo[Int] + c ← ask(actor3, (a + b)).mapTo[Int] } yield c val result = Await.result(f3, 1 second).asInstanceOf[Int] @@ -192,7 +195,7 @@ class FutureDocSpec extends AkkaSpec { val oddActor = system.actorOf(Props[OddActor]) //#sequence-ask // oddActor returns odd numbers sequentially from 1 as a List[Future[Int]] - val listOfFutures = List.fill(100)(Patterns.ask(oddActor, GetNext).mapTo[Int]) + val listOfFutures = List.fill(100)(akka.patterns.ask(oddActor, GetNext).mapTo[Int]) // now we have a Future[List[Int]] val futureList = Future.sequence(listOfFutures) @@ -240,7 +243,7 @@ class FutureDocSpec extends AkkaSpec { val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#recover - val future = Patterns.ask(actor, msg1) recover { + val future = akka.patterns.ask(actor, msg1) recover { case e: ArithmeticException ⇒ 0 } //#recover diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5b38996d3b..65417d9063 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -138,7 +138,7 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def ask(result: Promise[Any], within: Timeout): Option[ActorRef] = local.ask(result, within) + def ask(within: Timeout): Option[PromiseActorRef] = local.ask(within) /** * Using (checking out) actor on a specific node. @@ -160,12 +160,12 @@ trait RemoteRef extends ActorRefScope { * This reference is network-aware (remembers its origin) and immutable. */ private[akka] class RemoteActorRef private[akka] ( - override val provider: RemoteActorRefProvider, + val provider: RemoteActorRefProvider, remote: RemoteSupport[ParsedTransportAddress], val path: ActorPath, val getParent: InternalActorRef, loader: Option[ClassLoader]) - extends InternalActorRef with RemoteRef { + extends InternalActorRef with RemoteRef with ActorRefWithProvider { def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream