diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index e5b2303140..de1dd88923 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -10,7 +10,7 @@ import akka.testkit.DefaultTimeout import java.util.concurrent.TimeoutException import akka.dispatch.Await import akka.util.Timeout -import akka.pattern.ask +import akka.pattern.{ ask, AskTimeoutException } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index 95ecfe1280..cc5eb4c3a9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -47,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec { "forward actor reference when invoking forward on ask" in { val chain = createForwardingChain(system) - chain.ask(ExpectedMessage)(5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } + chain.ask(ExpectedMessage)(5 seconds) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } expectMsg(5 seconds, ExpectedMessage) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 9ebae64e62..a053608a1b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -462,41 +462,3 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal } } } - -/** - * This is what is used to complete a Future that is returned from an ask/? call, - * when it times out. - */ -class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { - def this(message: String) = this(message, null: Throwable) -} - -/** - * Akka private optimized representation of the temporary actor spawned to - * receive the reply to an "ask" operation. - */ -private[akka] final class PromiseActorRef( - val path: ActorPath, - override val getParent: InternalActorRef, - val result: Promise[Any], - val deathWatch: DeathWatch) extends MinimalActorRef { - - final val running = new AtomicBoolean(true) - - override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match { - case Status.Success(r) ⇒ result.success(r) - case Status.Failure(f) ⇒ result.failure(f) - case other ⇒ result.success(other) - } - - override def sendSystemMessage(message: SystemMessage): Unit = message match { - case _: Terminate ⇒ stop() - case _ ⇒ - } - - override def isTerminated = result.isCompleted - - override def stop(): Unit = if (running.getAndSet(false)) { - deathWatch.publish(Terminated(this)) - } -} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 4408f7562e..50478a248f 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -70,6 +70,26 @@ trait ActorRefProvider { def scheduler: Scheduler + /** + * Generates and returns a unique actor path below “/temp”. + */ + def tempPath(): ActorPath + + /** + * Returns the actor reference representing the “/temp” path. + */ + def tempContainer: InternalActorRef + + /** + * Registers an actorRef at a path returned by tempPath(); do NOT pass in any other path. + */ + def registerTempActor(actorRef: InternalActorRef, path: ActorPath) + + /** + * Unregister a temporary actor from the “/temp” path (i.e. obtained from tempPath()); do NOT pass in any other path. + */ + def unregisterTempActor(path: ActorPath) + /** * Actor factory with create-only semantics: will create an actor as * described by props with the given supervisor and path (may be different @@ -100,12 +120,6 @@ trait ActorRefProvider { */ def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef - /** - * Create AskActorRef and register it properly so it can be serialized/deserialized; - * caller needs to send the message. - */ - def ask(within: Timeout): Option[PromiseActorRef] - /** * This Future is completed upon termination of this ActorRefProvider, which * is usually initiated by stopping the guardian via ActorSystem.stop(). @@ -441,6 +455,16 @@ class LocalActorRefProvider( lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log) + def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = { + assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()") + tempContainer.addChild(path.name, actorRef) + } + + def unregisterTempActor(path: ActorPath): Unit = { + assert(path.parent eq tempNode, "cannot unregisterTempActor() with anything not obtained from tempPath()") + tempContainer.removeChild(path.name) + } + val deathWatch = new LocalDeathWatch(1024) //TODO make configrable def init(_system: ActorSystemImpl) { @@ -493,25 +517,6 @@ class LocalActorRefProvider( new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path) } } - - def ask(within: Timeout): Option[PromiseActorRef] = { - (if (within == null) settings.ActorTimeout else within) match { - case t if t.duration.length <= 0 ⇒ None - case t ⇒ - val path = tempPath() - val name = path.name - val result = Promise[Any]()(dispatcher) - val a = new PromiseActorRef(path, tempContainer, result, deathWatch) - tempContainer.addChild(name, a) - val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } - result onComplete { _ ⇒ - try { a.stop(); f.cancel() } - finally { tempContainer.removeChild(name) } - } - - Some(a) - } - } } class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification { diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala new file mode 100644 index 0000000000..bc79877fc3 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.pattern + +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeoutException +import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath } +import akka.dispatch.{ Promise, Terminate, SystemMessage, Future } +import akka.event.DeathWatch +import akka.actor.ActorRefProvider +import akka.util.Timeout + +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + +object AskSupport { + + final class AskableActorRef(val actorRef: ActorRef) { + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = worker.ask(request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = worker ? request + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message) + + /* + * FIXME: I think this should be removed, since it introduces an “ambiguity” + * when sending Tuple2, which the compiler resolves unexpectedly to this + * method; also overloading is bad, isn’t it? - RK (ticket #1653) + */ + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + * The implicit parameter with the default value is just there to disambiguate it from the version that takes the + * implicit timeout + */ + def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) + + } + + /** + * Akka private optimized representation of the temporary actor spawned to + * receive the reply to an "ask" operation. + */ + private[akka] final class PromiseActorRef( + val path: ActorPath, + override val getParent: InternalActorRef, + val result: Promise[Any], + val deathWatch: DeathWatch) extends MinimalActorRef { + + final val running = new AtomicBoolean(true) + + override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match { + case Status.Success(r) ⇒ result.success(r) + case Status.Failure(f) ⇒ result.failure(f) + case other ⇒ result.success(other) + } + + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case _: Terminate ⇒ stop() + case _ ⇒ + } + + override def isTerminated = result.isCompleted + + override def stop(): Unit = if (running.getAndSet(false)) { + deathWatch.publish(Terminated(this)) + } + } + + def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { + val path = provider.tempPath() + val result = Promise[Any]()(provider.dispatcher) + val a = new PromiseActorRef(path, provider.tempContainer, result, provider.deathWatch) + provider.registerTempActor(a, path) + val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) } + result onComplete { _ ⇒ + try { a.stop(); f.cancel() } + finally { provider.unregisterTempActor(path) } + } + a + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/AskableActorRef.scala b/akka-actor/src/main/scala/akka/pattern/AskableActorRef.scala deleted file mode 100644 index faa4c4d5c0..0000000000 --- a/akka-actor/src/main/scala/akka/pattern/AskableActorRef.scala +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ -package akka.pattern - -import akka.actor.{ ActorRef, InternalActorRef, AskTimeoutException } -import akka.dispatch.{ Future, Promise } -import akka.util.Timeout - -final class AskableActorRef(val actorRef: ActorRef) { - - /** - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] - * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * val f = worker.ask(request)(timeout) - * flow { - * EnrichedRequest(request, f()) - * } pipeTo nextActor - * }}} - * - * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] - */ - def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) - - /** - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] - * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * val f = worker ? request - * flow { - * EnrichedRequest(request, f()) - * } pipeTo nextActor - * }}} - * - * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] - */ - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message) - - /* - * FIXME: I think this should be removed, since it introduces an “ambiguity” - * when sending Tuple2, which the compiler resolves unexpectedly to this - * method; also overloading is bad, isn’t it? - RK (ticket #1653) - */ - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The implicit parameter with the default value is just there to disambiguate it from the version that takes the - * implicit timeout - */ - def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) - -} diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index bc86877f40..9f65728517 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -9,6 +9,38 @@ object Patterns { import akka.pattern.{ ask ⇒ scalaAsk } import akka.util.Timeout + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * This variant will use the `akka.actor.timeout` from the configuration. + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(worker, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(actor: ActorRef, message: Any): Future[AnyRef] = scalaAsk(actor, message).asInstanceOf[Future[AnyRef]] + /** * Sends a message asynchronously and returns a [[akka.dispatch.Future]] * holding the eventual reply message; this means that the target actor diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index bfd6384c21..5bc426a67d 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -5,7 +5,7 @@ package akka package object pattern { - import akka.actor.{ ActorRef, InternalActorRef, ActorRefWithProvider, AskTimeoutException } + import akka.actor.{ ActorRef, InternalActorRef, ActorRefWithProvider } import akka.dispatch.{ Future, Promise } import akka.util.Timeout @@ -24,7 +24,7 @@ package object pattern { * * All of the above use an implicit [[akka.actor.Timeout]]. */ - implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) + implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef) /** * Sends a message asynchronously and returns a [[akka.dispatch.Future]] @@ -54,15 +54,17 @@ package object pattern { * * [see [[akka.dispatch.Future]] for a description of `flow`] */ - def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { + def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = actorRef match { case ref: ActorRefWithProvider ⇒ - ref.provider.ask(timeout) match { - case Some(ref) ⇒ - actorRef.tell(message, ref) - ref.result - case None ⇒ + val provider = ref.provider + (if (timeout == null) provider.settings.ActorTimeout else timeout) match { + case t if t.duration.length <= 0 ⇒ actorRef.tell(message) - Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(ref.provider.dispatcher) + Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(provider.dispatcher) + case t ⇒ + val a = AskSupport.createAsker(provider, t) + actorRef.tell(message, a) + a.result } case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 11f8fee4af..c2f7faaae2 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -9,6 +9,7 @@ import scala.collection.JavaConversions._ import akka.util.{ Duration, Timeout } import akka.config.ConfigurationException import akka.dispatch.Promise +import akka.pattern.AskSupport /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -404,7 +405,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ val provider: ActorRefProvider = context.asInstanceOf[ActorCell].systemImpl.provider - val asker = provider.ask(Timeout(within)).get + val asker = AskSupport.createAsker(provider, within) asker.result.pipeTo(sender) toAll(asker, ref.routees) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 65417d9063..fe81dc431e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -39,6 +39,11 @@ class RemoteActorRefProvider( def terminationFuture = local.terminationFuture def dispatcher = local.dispatcher + def registerTempActor(actorRef: InternalActorRef, path: ActorPath) = local.registerTempActor(actorRef, path) + def unregisterTempActor(path: ActorPath) = local.unregisterTempActor(path) + def tempPath() = local.tempPath() + def tempContainer = local.tempContainer + val deployer = new RemoteDeployer(settings) val remote = new Remote(settings, remoteSettings) @@ -138,8 +143,6 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def ask(within: Timeout): Option[PromiseActorRef] = local.ask(within) - /** * Using (checking out) actor on a specific node. */ diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 4f1fb545c0..d9d815d018 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -83,8 +83,8 @@ akka { "support ask" in { Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: PromiseActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") + case ("pong", s: akka.pattern.AskSupport.PromiseActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") } }