move ask machinery from ActorRefProvider to pattern.AskSupport

- now everything is assembled in one spot
- also moved PromiseActorRef and AskTimeoutException from akka.actor
- plus a little boy-scouting
This commit is contained in:
Roland 2012-01-18 11:52:35 +01:00
parent 2bed2cb954
commit 00ec3f89dc
11 changed files with 226 additions and 163 deletions

View file

@ -10,7 +10,7 @@ import akka.testkit.DefaultTimeout
import java.util.concurrent.TimeoutException
import akka.dispatch.Await
import akka.util.Timeout
import akka.pattern.ask
import akka.pattern.{ ask, AskTimeoutException }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {

View file

@ -47,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec {
"forward actor reference when invoking forward on ask" in {
val chain = createForwardingChain(system)
chain.ask(ExpectedMessage)(5000) onSuccess { case ExpectedMessage testActor ! ExpectedMessage }
chain.ask(ExpectedMessage)(5 seconds) onSuccess { case ExpectedMessage testActor ! ExpectedMessage }
expectMsg(5 seconds, ExpectedMessage)
}
}

View file

@ -462,41 +462,3 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal
}
}
}
/**
* This is what is used to complete a Future that is returned from an ask/? call,
* when it times out.
*/
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
def this(message: String) = this(message, null: Throwable)
}
/**
* Akka private optimized representation of the temporary actor spawned to
* receive the reply to an "ask" operation.
*/
private[akka] final class PromiseActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,
val result: Promise[Any],
val deathWatch: DeathWatch) extends MinimalActorRef {
final val running = new AtomicBoolean(true)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match {
case Status.Success(r) result.success(r)
case Status.Failure(f) result.failure(f)
case other result.success(other)
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case _
}
override def isTerminated = result.isCompleted
override def stop(): Unit = if (running.getAndSet(false)) {
deathWatch.publish(Terminated(this))
}
}

View file

@ -70,6 +70,26 @@ trait ActorRefProvider {
def scheduler: Scheduler
/**
* Generates and returns a unique actor path below /temp.
*/
def tempPath(): ActorPath
/**
* Returns the actor reference representing the /temp path.
*/
def tempContainer: InternalActorRef
/**
* Registers an actorRef at a path returned by tempPath(); do NOT pass in any other path.
*/
def registerTempActor(actorRef: InternalActorRef, path: ActorPath)
/**
* Unregister a temporary actor from the /temp path (i.e. obtained from tempPath()); do NOT pass in any other path.
*/
def unregisterTempActor(path: ActorPath)
/**
* Actor factory with create-only semantics: will create an actor as
* described by props with the given supervisor and path (may be different
@ -100,12 +120,6 @@ trait ActorRefProvider {
*/
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
/**
* Create AskActorRef and register it properly so it can be serialized/deserialized;
* caller needs to send the message.
*/
def ask(within: Timeout): Option[PromiseActorRef]
/**
* This Future is completed upon termination of this ActorRefProvider, which
* is usually initiated by stopping the guardian via ActorSystem.stop().
@ -441,6 +455,16 @@ class LocalActorRefProvider(
lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log)
def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = {
assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()")
tempContainer.addChild(path.name, actorRef)
}
def unregisterTempActor(path: ActorPath): Unit = {
assert(path.parent eq tempNode, "cannot unregisterTempActor() with anything not obtained from tempPath()")
tempContainer.removeChild(path.name)
}
val deathWatch = new LocalDeathWatch(1024) //TODO make configrable
def init(_system: ActorSystemImpl) {
@ -493,25 +517,6 @@ class LocalActorRefProvider(
new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path)
}
}
def ask(within: Timeout): Option[PromiseActorRef] = {
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0 None
case t
val path = tempPath()
val name = path.name
val result = Promise[Any]()(dispatcher)
val a = new PromiseActorRef(path, tempContainer, result, deathWatch)
tempContainer.addChild(name, a)
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
result onComplete { _
try { a.stop(); f.cancel() }
finally { tempContainer.removeChild(name) }
}
Some(a)
}
}
}
class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification {

View file

@ -0,0 +1,142 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeoutException
import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath }
import akka.dispatch.{ Promise, Terminate, SystemMessage, Future }
import akka.event.DeathWatch
import akka.actor.ActorRefProvider
import akka.util.Timeout
/**
* This is what is used to complete a Future that is returned from an ask/? call,
* when it times out.
*/
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
def this(message: String) = this(message, null: Throwable)
}
object AskSupport {
final class AskableActorRef(val actorRef: ActorRef) {
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = worker.ask(request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = worker ? request
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)
/*
* FIXME: I think this should be removed, since it introduces an ambiguity
* when sending Tuple2, which the compiler resolves unexpectedly to this
* method; also overloading is bad, isnt it? - RK (ticket #1653)
*/
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
* implicit timeout
*/
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
/**
* Akka private optimized representation of the temporary actor spawned to
* receive the reply to an "ask" operation.
*/
private[akka] final class PromiseActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,
val result: Promise[Any],
val deathWatch: DeathWatch) extends MinimalActorRef {
final val running = new AtomicBoolean(true)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match {
case Status.Success(r) result.success(r)
case Status.Failure(f) result.failure(f)
case other result.success(other)
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case _
}
override def isTerminated = result.isCompleted
override def stop(): Unit = if (running.getAndSet(false)) {
deathWatch.publish(Terminated(this))
}
}
def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
val path = provider.tempPath()
val result = Promise[Any]()(provider.dispatcher)
val a = new PromiseActorRef(path, provider.tempContainer, result, provider.deathWatch)
provider.registerTempActor(a, path)
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) }
result onComplete { _
try { a.stop(); f.cancel() }
finally { provider.unregisterTempActor(path) }
}
a
}
}

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.actor.{ ActorRef, InternalActorRef, AskTimeoutException }
import akka.dispatch.{ Future, Promise }
import akka.util.Timeout
final class AskableActorRef(val actorRef: ActorRef) {
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = worker.ask(request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = worker ? request
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)
/*
* FIXME: I think this should be removed, since it introduces an ambiguity
* when sending Tuple2, which the compiler resolves unexpectedly to this
* method; also overloading is bad, isnt it? - RK (ticket #1653)
*/
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
* implicit timeout
*/
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}

View file

@ -9,6 +9,38 @@ object Patterns {
import akka.pattern.{ ask scalaAsk }
import akka.util.Timeout
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>This variant will use the `akka.actor.timeout` from the configuration.</b>
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final Future<Object> f = Patterns.ask(worker, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* }}}
*/
def ask(actor: ActorRef, message: Any): Future[AnyRef] = scalaAsk(actor, message).asInstanceOf[Future[AnyRef]]
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor

View file

@ -5,7 +5,7 @@ package akka
package object pattern {
import akka.actor.{ ActorRef, InternalActorRef, ActorRefWithProvider, AskTimeoutException }
import akka.actor.{ ActorRef, InternalActorRef, ActorRefWithProvider }
import akka.dispatch.{ Future, Promise }
import akka.util.Timeout
@ -24,7 +24,7 @@ package object pattern {
*
* All of the above use an implicit [[akka.actor.Timeout]].
*/
implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
@ -54,15 +54,17 @@ package object pattern {
*
* [see [[akka.dispatch.Future]] for a description of `flow`]
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = actorRef match {
case ref: ActorRefWithProvider
ref.provider.ask(timeout) match {
case Some(ref)
actorRef.tell(message, ref)
ref.result
case None
val provider = ref.provider
(if (timeout == null) provider.settings.ActorTimeout else timeout) match {
case t if t.duration.length <= 0
actorRef.tell(message)
Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(ref.provider.dispatcher)
Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(provider.dispatcher)
case t
val a = AskSupport.createAsker(provider, t)
actorRef.tell(message, a)
a.result
}
case _ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
}

View file

@ -9,6 +9,7 @@ import scala.collection.JavaConversions._
import akka.util.{ Duration, Timeout }
import akka.config.ConfigurationException
import akka.dispatch.Promise
import akka.pattern.AskSupport
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -404,7 +405,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
{
case (sender, message)
val provider: ActorRefProvider = context.asInstanceOf[ActorCell].systemImpl.provider
val asker = provider.ask(Timeout(within)).get
val asker = AskSupport.createAsker(provider, within)
asker.result.pipeTo(sender)
toAll(asker, ref.routees)
}

View file

@ -39,6 +39,11 @@ class RemoteActorRefProvider(
def terminationFuture = local.terminationFuture
def dispatcher = local.dispatcher
def registerTempActor(actorRef: InternalActorRef, path: ActorPath) = local.registerTempActor(actorRef, path)
def unregisterTempActor(path: ActorPath) = local.unregisterTempActor(path)
def tempPath() = local.tempPath()
def tempContainer = local.tempContainer
val deployer = new RemoteDeployer(settings)
val remote = new Remote(settings, remoteSettings)
@ -138,8 +143,6 @@ class RemoteActorRefProvider(
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
def ask(within: Timeout): Option[PromiseActorRef] = local.ask(within)
/**
* Using (checking out) actor on a specific node.
*/

View file

@ -83,7 +83,7 @@ akka {
"support ask" in {
Await.result(here ? "ping", timeout.duration) match {
case ("pong", s: PromiseActorRef) // good
case ("pong", s: akka.pattern.AskSupport.PromiseActorRef) // good
case m fail(m + " was not (pong, AskActorRef)")
}
}