polish “ask” pattern, see #1581
- move package objects into their respective package.scala file in the right directories - make implicit conversion as well as explicit facility available under the same name akka.patterns.ask for easy import - revert the logic to produce the Promise for the PromiseActorRef within the ActorRefProvider; supporting wrapping of external Promises does not seem to justify doing needless extra allocations in case of failure - add scaladocs - factor out “def provider” into trait ActorRefWithProvider, as it didn’t feel right attaching this information “by exception” to MinimalActorRef
This commit is contained in:
parent
ce1d2f4721
commit
9c762dec20
11 changed files with 268 additions and 176 deletions
|
|
@ -47,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec {
|
|||
|
||||
"forward actor reference when invoking forward on ask" in {
|
||||
val chain = createForwardingChain(system)
|
||||
chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage }
|
||||
chain.ask(ExpectedMessage)(5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage }
|
||||
expectMsg(5 seconds, ExpectedMessage)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
71
akka-actor/src/main/scala/akka/Patterns.scala
Normal file
71
akka-actor/src/main/scala/akka/Patterns.scala
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka
|
||||
|
||||
object Patterns {
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.Future
|
||||
import akka.patterns.{ ask ⇒ scalaAsk }
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* final Future<Object> f = Patterns.ask(worker, request, timeout);
|
||||
* f.onSuccess(new Procedure<Object>() {
|
||||
* public void apply(Object o) {
|
||||
* nextActor.tell(new EnrichedResult(request, o));
|
||||
* }
|
||||
* });
|
||||
* }}}
|
||||
*/
|
||||
def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* final Future<Object> f = Patterns.ask(worker, request, timeout);
|
||||
* f.onSuccess(new Procedure<Object>() {
|
||||
* public void apply(Object o) {
|
||||
* nextActor.tell(new EnrichedResult(request, o));
|
||||
* }
|
||||
* });
|
||||
* }}}
|
||||
*/
|
||||
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]]
|
||||
}
|
||||
|
|
@ -167,6 +167,13 @@ trait LocalRef extends ActorRefScope {
|
|||
final def isLocal = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Trait for matching on ActorRefs which have access to a provider; this is used in akka.patterns.ask.
|
||||
*/
|
||||
trait ActorRefWithProvider { this: InternalActorRef ⇒
|
||||
def provider: ActorRefProvider
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal trait for assembling all the functionality needed internally on
|
||||
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
|
||||
|
|
@ -180,7 +187,6 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
|||
def stop(): Unit
|
||||
def sendSystemMessage(message: SystemMessage): Unit
|
||||
def getParent: InternalActorRef
|
||||
def provider: ActorRefProvider
|
||||
/**
|
||||
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
|
||||
* some provider-specific location. This method shall return the end result,
|
||||
|
|
@ -212,7 +218,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
val systemService: Boolean = false,
|
||||
_receiveTimeout: Option[Duration] = None,
|
||||
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
|
||||
extends InternalActorRef with LocalRef {
|
||||
extends InternalActorRef with LocalRef with ActorRefWithProvider {
|
||||
|
||||
/*
|
||||
* actorCell.start() publishes actorCell & this to the dispatcher, which
|
||||
|
|
@ -341,8 +347,7 @@ case class SerializedActorRef(path: String) {
|
|||
trait MinimalActorRef extends InternalActorRef with LocalRef {
|
||||
|
||||
def getParent: InternalActorRef = Nobody
|
||||
def provider: ActorRefProvider =
|
||||
throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
|
||||
|
||||
def getChild(names: Iterator[String]): InternalActorRef = {
|
||||
val dropped = names.dropWhile(_.isEmpty)
|
||||
if (dropped.isEmpty) this
|
||||
|
|
@ -466,10 +471,14 @@ class AskTimeoutException(message: String, cause: Throwable) extends TimeoutExce
|
|||
def this(message: String) = this(message, null: Throwable)
|
||||
}
|
||||
|
||||
/**
|
||||
* Akka private optimized representation of the temporary actor spawned to
|
||||
* receive the reply to an "ask" operation.
|
||||
*/
|
||||
private[akka] final class PromiseActorRef(
|
||||
val path: ActorPath,
|
||||
override val getParent: InternalActorRef,
|
||||
private final val result: Promise[Any],
|
||||
val result: Promise[Any],
|
||||
val deathWatch: DeathWatch) extends MinimalActorRef {
|
||||
|
||||
final val running = new AtomicBoolean(true)
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ trait ActorRefProvider {
|
|||
* Create AskActorRef and register it properly so it can be serialized/deserialized;
|
||||
* caller needs to send the message.
|
||||
*/
|
||||
def ask(result: Promise[Any], within: Timeout): Option[ActorRef]
|
||||
def ask(within: Timeout): Option[PromiseActorRef]
|
||||
|
||||
/**
|
||||
* This Future is completed upon termination of this ActorRefProvider, which
|
||||
|
|
@ -494,12 +494,13 @@ class LocalActorRefProvider(
|
|||
}
|
||||
}
|
||||
|
||||
def ask(result: Promise[Any], within: Timeout): Option[ActorRef] = {
|
||||
def ask(within: Timeout): Option[PromiseActorRef] = {
|
||||
(if (within == null) settings.ActorTimeout else within) match {
|
||||
case t if t.duration.length <= 0 ⇒ None
|
||||
case t ⇒
|
||||
val path = tempPath()
|
||||
val name = path.name
|
||||
val result = Promise[Any]()(dispatcher)
|
||||
val a = new PromiseActorRef(path, tempContainer, result, deathWatch)
|
||||
tempContainer.addChild(name, a)
|
||||
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
|
||||
|
|
@ -509,24 +510,6 @@ class LocalActorRefProvider(
|
|||
}
|
||||
|
||||
Some(a)
|
||||
|
||||
// Alternative implementation:
|
||||
// Create a full-blown actor to complete the promise.
|
||||
// This would also work but not as efficient as PromiseActorRef.
|
||||
//val b = actorOf(system, Props(new Actor {
|
||||
// def receive = {
|
||||
// case Status.Success(r) ⇒ result.success(r)
|
||||
// case Status.Failure(f) ⇒ result.failure(f)
|
||||
// case other ⇒ result.success(other)
|
||||
// }
|
||||
//}), systemGuardian, systemGuardian.path / "promise" / tempName(), false, None)
|
||||
//val ff = system.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
|
||||
//result onComplete { _ ⇒
|
||||
// b.stop()
|
||||
// ff.cancel()
|
||||
//}
|
||||
//
|
||||
//Some(b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,126 +39,3 @@ package object actor {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
package object patterns {
|
||||
|
||||
import akka.actor.{ ActorRef, InternalActorRef }
|
||||
import akka.dispatch.Promise
|
||||
import akka.util.Timeout
|
||||
|
||||
implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef()(actorRef)
|
||||
|
||||
// Implicit for converting a Promise to an ActorRef.
|
||||
// Symmetric to the future2actor conversion, which allows
|
||||
// piping a Future result (read side) to an Actor's mailbox, this
|
||||
// conversion allows using an Actor to complete a Promise (write side)
|
||||
//
|
||||
// Future.ask / actor ? message is now a trivial implementation that can
|
||||
// also be done in user code (assuming actorRef, timeout and dispatcher implicits):
|
||||
//
|
||||
// Patterns.ask(actor, message) = {
|
||||
// val promise = Promise[Any]()
|
||||
// actor ! (message, promise)
|
||||
// promise
|
||||
// }
|
||||
|
||||
@inline implicit def promise2actorRef(promise: Promise[Any])(implicit actorRef: ActorRef, timeout: Timeout): ActorRef = {
|
||||
val provider = actorRef.asInstanceOf[InternalActorRef].provider
|
||||
provider.ask(promise, timeout) match {
|
||||
case Some(ref) ⇒ ref
|
||||
case None ⇒ null
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
package patterns {
|
||||
|
||||
import akka.actor.{ ActorRef, InternalActorRef }
|
||||
import akka.dispatch.{ Future, Promise }
|
||||
import akka.util.Timeout
|
||||
|
||||
final class AskableActorRef(implicit val actorRef: ActorRef) {
|
||||
|
||||
/**
|
||||
* Akka Java API.
|
||||
*
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
|
||||
* timeout has expired.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use 'tell' together with the sender
|
||||
* parameter to implement non-blocking request/response message exchanges.
|
||||
*
|
||||
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
|
||||
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
|
||||
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
|
||||
* otherwise the sender will block until the timeout expires.
|
||||
*
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s reference, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*/
|
||||
def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
|
||||
|
||||
def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
|
||||
* timeout has expired.
|
||||
*
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
|
||||
* sender parameter to implement non-blocking request/response message exchanges.
|
||||
*
|
||||
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
|
||||
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
|
||||
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
|
||||
* otherwise the sender will block until the timeout expires.
|
||||
*
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s reference, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*/
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
implicit val dispatcher = actorRef.asInstanceOf[InternalActorRef].provider.dispatcher
|
||||
val promise = Promise[Any]()
|
||||
actorRef.!(message)(promise)
|
||||
promise
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
|
||||
* implicit timeout
|
||||
*/
|
||||
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Patterns {
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.Future
|
||||
import akka.patterns.{ ask ⇒ actorRef2Askable }
|
||||
import akka.util.Timeout
|
||||
|
||||
def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] =
|
||||
actorRef2Askable(actor).?(message)
|
||||
|
||||
def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] =
|
||||
actorRef2Askable(actor).?(message)(timeout)
|
||||
|
||||
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] =
|
||||
actorRef2Askable(actor).?(message)(new Timeout(timeoutMillis))
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.patterns
|
||||
|
||||
import akka.actor.{ ActorRef, InternalActorRef, AskTimeoutException }
|
||||
import akka.dispatch.{ Future, Promise }
|
||||
import akka.util.Timeout
|
||||
|
||||
final class AskableActorRef(val actorRef: ActorRef) {
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = worker.ask(request)(timeout)
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
||||
*/
|
||||
def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.patterns.ask(actorRef, message)(timeout)
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = worker ? request
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
||||
*/
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.patterns.ask(actorRef, message)
|
||||
|
||||
/*
|
||||
* FIXME: I think this should be removed, since it introduces an “ambiguity”
|
||||
* when sending Tuple2, which the compiler resolves unexpectedly to this
|
||||
* method; also overloading is bad, isn’t it? - RK (ticket #1653)
|
||||
*/
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
|
||||
* implicit timeout
|
||||
*/
|
||||
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
|
||||
|
||||
}
|
||||
70
akka-actor/src/main/scala/akka/patterns/package.scala
Normal file
70
akka-actor/src/main/scala/akka/patterns/package.scala
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka
|
||||
|
||||
package object patterns {
|
||||
|
||||
import akka.actor.{ ActorRef, InternalActorRef, ActorRefWithProvider, AskTimeoutException }
|
||||
import akka.dispatch.{ Future, Promise }
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* Import this implicit conversion to gain `?` and `ask` methods on
|
||||
* [[akka.actor.ActorRef]], which will defer to the
|
||||
* `ask(actorRef, message)(timeout)` method defined here.
|
||||
*
|
||||
* {{{
|
||||
* import akka.patterns.ask
|
||||
*
|
||||
* val future = actor ? message // => ask(actor, message)
|
||||
* val future = actor ask message // => ask(actor, message)
|
||||
* val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout)
|
||||
* }}}
|
||||
*
|
||||
* All of the above use an implicit [[akka.actor.Timeout]].
|
||||
*/
|
||||
implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = ask(worker, request)(timeout)
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||
*/
|
||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
|
||||
case ref: ActorRefWithProvider ⇒
|
||||
ref.provider.ask(timeout) match {
|
||||
case Some(ref) ⇒
|
||||
actorRef.tell(message, ref)
|
||||
ref.result
|
||||
case None ⇒
|
||||
actorRef.tell(message)
|
||||
Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(ref.provider.dispatcher)
|
||||
}
|
||||
case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -351,10 +351,7 @@ trait BroadcastLike { this: RouterConfig ⇒
|
|||
createAndRegisterRoutees(props, context, nrOfInstances, routees)
|
||||
|
||||
{
|
||||
case (sender, message) ⇒
|
||||
message match {
|
||||
case _ ⇒ toAll(sender, ref.routees)
|
||||
}
|
||||
case (sender, message) ⇒ toAll(sender, ref.routees)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -407,12 +404,9 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
|
|||
{
|
||||
case (sender, message) ⇒
|
||||
val provider: ActorRefProvider = context.asInstanceOf[ActorCell].systemImpl.provider
|
||||
val promise = Promise[Any]()(provider.dispatcher)
|
||||
val asker = provider.ask(promise, Timeout(within)).get
|
||||
promise.pipeTo(sender)
|
||||
message match {
|
||||
case _ ⇒ toAll(asker, ref.routees)
|
||||
}
|
||||
val asker = provider.ask(Timeout(within)).get
|
||||
asker.result.pipeTo(sender)
|
||||
toAll(asker, ref.routees)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import akka.actor.Actor
|
|||
import akka.actor.Props
|
||||
import akka.event.Logging
|
||||
import akka.dispatch.Future
|
||||
import akka.Patterns
|
||||
|
||||
//#imports1
|
||||
|
||||
|
|
@ -221,6 +220,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
|
||||
"using ask" in {
|
||||
//#using-ask
|
||||
import akka.patterns.ask
|
||||
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case x: String ⇒ sender ! x.toUpperCase
|
||||
|
|
@ -230,10 +231,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
|
||||
val myActor = system.actorOf(Props(new MyActor), name = "myactor")
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = Patterns.ask(myActor, "hello")
|
||||
val future = ask(myActor, "hello")
|
||||
for (x ← future) println(x) //Prints "hello"
|
||||
|
||||
val result: Future[Int] = for (x ← Patterns.ask(myActor, 3).mapTo[Int]) yield { 2 * x }
|
||||
val result: Future[Int] = for (x ← ask(myActor, 3).mapTo[Int]) yield { 2 * x }
|
||||
//#using-ask
|
||||
|
||||
system.stop(myActor)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import akka.dispatch.Future
|
|||
import akka.dispatch.Await
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.Promise
|
||||
import akka.Patterns
|
||||
|
||||
object FutureDocSpec {
|
||||
|
||||
|
|
@ -45,9 +44,10 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val msg = "hello"
|
||||
//#ask-blocking
|
||||
import akka.dispatch.Await
|
||||
import akka.patterns.ask
|
||||
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = Patterns.ask(actor, msg)
|
||||
val future = actor ? msg // enabled by the “ask” import
|
||||
val result = Await.result(future, timeout.duration).asInstanceOf[String]
|
||||
//#ask-blocking
|
||||
result must be("HELLO")
|
||||
|
|
@ -59,8 +59,9 @@ class FutureDocSpec extends AkkaSpec {
|
|||
implicit val timeout = system.settings.ActorTimeout
|
||||
//#map-to
|
||||
import akka.dispatch.Future
|
||||
import akka.patterns.ask
|
||||
|
||||
val future: Future[String] = Patterns.ask(actor, msg).mapTo[String]
|
||||
val future: Future[String] = ask(actor, msg).mapTo[String]
|
||||
//#map-to
|
||||
Await.result(future, timeout.duration) must be("HELLO")
|
||||
}
|
||||
|
|
@ -148,15 +149,16 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val msg2 = 2
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
import akka.dispatch.Await
|
||||
import akka.patterns.ask
|
||||
//#composing-wrong
|
||||
|
||||
val f1 = Patterns.ask(actor1, msg1)
|
||||
val f2 = Patterns.ask(actor2, msg2)
|
||||
val f1 = ask(actor1, msg1)
|
||||
val f2 = ask(actor2, msg2)
|
||||
|
||||
val a = Await.result(f1, 1 second).asInstanceOf[Int]
|
||||
val b = Await.result(f2, 1 second).asInstanceOf[Int]
|
||||
|
||||
val f3 = Patterns.ask(actor3, (a + b))
|
||||
val f3 = ask(actor3, (a + b))
|
||||
|
||||
val result = Await.result(f3, 1 second).asInstanceOf[Int]
|
||||
//#composing-wrong
|
||||
|
|
@ -171,15 +173,16 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val msg2 = 2
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
import akka.dispatch.Await
|
||||
import akka.patterns.ask
|
||||
//#composing
|
||||
|
||||
val f1 = Patterns.ask(actor1, msg1)
|
||||
val f2 = Patterns.ask(actor2, msg2)
|
||||
val f1 = ask(actor1, msg1)
|
||||
val f2 = ask(actor2, msg2)
|
||||
|
||||
val f3 = for {
|
||||
a ← f1.mapTo[Int]
|
||||
b ← f2.mapTo[Int]
|
||||
c ← Patterns.ask(actor3, (a + b)).mapTo[Int]
|
||||
c ← ask(actor3, (a + b)).mapTo[Int]
|
||||
} yield c
|
||||
|
||||
val result = Await.result(f3, 1 second).asInstanceOf[Int]
|
||||
|
|
@ -192,7 +195,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val oddActor = system.actorOf(Props[OddActor])
|
||||
//#sequence-ask
|
||||
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
|
||||
val listOfFutures = List.fill(100)(Patterns.ask(oddActor, GetNext).mapTo[Int])
|
||||
val listOfFutures = List.fill(100)(akka.patterns.ask(oddActor, GetNext).mapTo[Int])
|
||||
|
||||
// now we have a Future[List[Int]]
|
||||
val futureList = Future.sequence(listOfFutures)
|
||||
|
|
@ -240,7 +243,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val actor = system.actorOf(Props[MyActor])
|
||||
val msg1 = -1
|
||||
//#recover
|
||||
val future = Patterns.ask(actor, msg1) recover {
|
||||
val future = akka.patterns.ask(actor, msg1) recover {
|
||||
case e: ArithmeticException ⇒ 0
|
||||
}
|
||||
//#recover
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ class RemoteActorRefProvider(
|
|||
|
||||
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
|
||||
|
||||
def ask(result: Promise[Any], within: Timeout): Option[ActorRef] = local.ask(result, within)
|
||||
def ask(within: Timeout): Option[PromiseActorRef] = local.ask(within)
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
|
|
@ -160,12 +160,12 @@ trait RemoteRef extends ActorRefScope {
|
|||
* This reference is network-aware (remembers its origin) and immutable.
|
||||
*/
|
||||
private[akka] class RemoteActorRef private[akka] (
|
||||
override val provider: RemoteActorRefProvider,
|
||||
val provider: RemoteActorRefProvider,
|
||||
remote: RemoteSupport[ParsedTransportAddress],
|
||||
val path: ActorPath,
|
||||
val getParent: InternalActorRef,
|
||||
loader: Option[ClassLoader])
|
||||
extends InternalActorRef with RemoteRef {
|
||||
extends InternalActorRef with RemoteRef with ActorRefWithProvider {
|
||||
|
||||
def getChild(name: Iterator[String]): InternalActorRef = {
|
||||
val s = name.toStream
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue