From 020c6b61dac47168e961943b46abff80fb875326 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 19 Jan 2012 15:13:10 +0100 Subject: [PATCH] move all Ask stuff to akka.pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - remove ?(msg, timeout), should always use ?(msg)(timeout) because of Scala’s only Martin-acknowledged design flaw of being able to pass tuples into single-arg methods without adding another pair of parens - put a provider into all actor refs, because they all are created by and associated with one - treat all terminated refs equally: tell(msg) and return broken promise --- .../scala/akka/actor/ActorLookupSpec.scala | 10 ++-- .../test/scala/akka/actor/ActorRefSpec.scala | 11 ++-- .../scala/akka/actor/ActorTimeoutSpec.scala | 2 +- .../scala/akka/actor/SupervisorSpec.scala | 10 ++-- .../test/scala/akka/dispatch/FutureSpec.scala | 12 ++-- .../src/test/scala/akka/pattern/AskSpec.scala | 35 ++++++++++++ .../src/main/scala/akka/actor/ActorRef.scala | 57 +++++++++++++------ .../scala/akka/actor/ActorRefProvider.scala | 6 +- .../main/scala/akka/actor/ActorSystem.scala | 4 +- .../src/main/scala/akka/actor/Locker.scala | 7 ++- .../main/scala/akka/actor/TypedActor.scala | 6 +- .../src/main/scala/akka/event/Logging.scala | 1 + .../main/scala/akka/pattern/AskSupport.scala | 20 ++----- .../src/main/scala/akka/pattern/package.scala | 17 +++--- .../src/main/scala/akka/util/Duration.scala | 1 - .../src/main/scala/akka/agent/Agent.scala | 4 +- .../code/akka/docs/actor/ActorDocSpec.scala | 2 +- .../src/main/scala/akka/remote/Remote.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 2 +- 19 files changed, 134 insertions(+), 75 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 135a7efe0a..f8bb976b3c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -40,11 +40,13 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val c2 = system.actorOf(p, "c2") val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration) - val user = system.asInstanceOf[ActorSystemImpl].guardian - val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian - val root = system.asInstanceOf[ActorSystemImpl].lookupRoot + val sysImpl = system.asInstanceOf[ActorSystemImpl] - def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match { + val user = sysImpl.guardian + val syst = sysImpl.systemGuardian + val root = sysImpl.lookupRoot + + def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match { case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 8dbd1c9a8c..a46b0a3a42 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -288,7 +288,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val baos = new ByteArrayOutputStream(8192 * 32) val out = new ObjectOutputStream(baos) - val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address + val sysImpl = system.asInstanceOf[ActorSystemImpl] + val addr = sysImpl.provider.rootPath.address val serialized = SerializedActorRef(addr + "/non-existing") out.writeObject(serialized) @@ -296,9 +297,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { out.flush out.close - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing") + in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing") } } @@ -359,8 +360,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { } })) - val ffive = (ref ? (5, timeout)).mapTo[String] - val fnull = (ref ? (null, timeout)).mapTo[String] + val ffive = (ref.ask(5)(timeout)).mapTo[String] + val fnull = (ref.ask(null)(timeout)).mapTo[String] ref ! PoisonPill Await.result(ffive, timeout.duration) must be("five") diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index d8977aa25d..dac38a7481 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -45,7 +45,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { val echo = system.actorOf(Props.empty) - val f = echo.?("hallo", testTimeout) + val f = echo.?("hallo")(testTimeout) try { intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) } } finally { system.stop(echo) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 73ff90c3c7..5995c47d7b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -129,12 +129,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } def ping(pingPongActor: ActorRef) = { - Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage expectMsg(Timeout, PingMessage) } def kill(pingPongActor: ActorRef) = { - val result = (pingPongActor ? (DieReply, TimeoutMillis)) + val result = (pingPongActor.?(DieReply)(TimeoutMillis)) expectMsg(Timeout, ExceptionMessage) intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } } @@ -152,7 +152,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "not restart temporary actor" in { val (temporaryActor, _) = temporaryActorAllForOne - intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } + intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } expectNoMsg(1 second) } @@ -298,11 +298,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { intercept[RuntimeException] { - Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) + Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } } - Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage inits.get must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 2ee7c4e6c7..44509b270f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -324,7 +324,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] } Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) } @@ -334,7 +334,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) } })) } - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(10000).mapTo[Int] } Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45) } @@ -351,7 +351,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] } intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -383,7 +383,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] } assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45) } @@ -400,7 +400,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] } intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected" } } @@ -441,7 +441,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "shouldHandleThrowables" in { class ThrowableTest(m: String) extends Throwable(m) - filterException[ThrowableTest] { + EventFilter[ThrowableTest](occurrences = 4) intercept { val f1 = Future[Any] { throw new ThrowableTest("test") } intercept[ThrowableTest] { Await.result(f1, timeout.duration) } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala new file mode 100644 index 0000000000..44cdd91eba --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.testkit.AkkaSpec + +class AskSpec extends AkkaSpec { + + "The “ask” pattern" must { + + "return broken promises on DeadLetters" in { + val dead = system.actorFor("/system/deadLetters") + val f = dead ask 42 + f.isCompleted must be(true) + f.value.get match { + case Left(_: AskTimeoutException) ⇒ + case v ⇒ fail(v + " was not Left(AskTimeoutException)") + } + } + + "return broken promises on EmptyLocalActorRefs" in { + val empty = system.actorFor("unknown") + implicit val timeout = system.settings.ActorTimeout + val f = empty ? 3.14 + f.isCompleted must be(true) + f.value.get match { + case Left(_: AskTimeoutException) ⇒ + case v ⇒ fail(v + " was not Left(AskTimeoutException)") + } + } + + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5d07a662b3..cbb21bff78 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -167,13 +167,6 @@ trait LocalRef extends ActorRefScope { final def isLocal = true } -/** - * Trait for matching on ActorRefs which have access to a provider; this is used in akka.pattern.ask. - */ -trait ActorRefWithProvider { this: InternalActorRef ⇒ - def provider: ActorRefProvider -} - /** * Internal trait for assembling all the functionality needed internally on * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE! @@ -181,12 +174,25 @@ trait ActorRefWithProvider { this: InternalActorRef ⇒ * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA! */ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒ + /* + * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). + */ def resume(): Unit def suspend(): Unit def restart(cause: Throwable): Unit def stop(): Unit def sendSystemMessage(message: SystemMessage): Unit + + /** + * Get a reference to the actor ref provider which created this ref. + */ + def provider: ActorRefProvider + + /** + * Obtain parent of this ref; used by getChild for ".." paths. + */ def getParent: InternalActorRef + /** * Obtain ActorRef by possibly traversing the actor tree or looking it up at * some provider-specific location. This method shall return the end result, @@ -196,6 +202,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe * exist, return Nobody. */ def getChild(name: Iterator[String]): InternalActorRef + /** * Scope: if this ref points to an actor which resides within the same JVM, * i.e. whose mailbox is directly reachable etc. @@ -203,8 +210,12 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def isLocal: Boolean } +/** + * This is an internal look-up failure token, not useful for anything else. + */ private[akka] case object Nobody extends MinimalActorRef { val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody") + def provider = throw new UnsupportedOperationException("Nobody does not provide") } /** @@ -218,7 +229,7 @@ private[akka] class LocalActorRef private[akka] ( val systemService: Boolean = false, _receiveTimeout: Option[Duration] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) - extends InternalActorRef with LocalRef with ActorRefWithProvider { + extends InternalActorRef with LocalRef { /* * actorCell.start() publishes actorCell & this to the dispatcher, which @@ -371,8 +382,9 @@ trait MinimalActorRef extends InternalActorRef with LocalRef { } object MinimalActorRef { - def apply(_path: ActorPath)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef { + def apply(_path: ActorPath, _provider: ActorRefProvider)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef { def path = _path + def provider = _provider override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (receive.isDefinedAt(message)) receive(message) } @@ -393,8 +405,6 @@ trait DeadLetterActorRefLike extends MinimalActorRef { def eventStream: EventStream - @volatile - private var brokenPromise: Future[Any] = _ @volatile private var _path: ActorPath = _ def path: ActorPath = { @@ -402,9 +412,13 @@ trait DeadLetterActorRefLike extends MinimalActorRef { _path } - private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) { + @volatile + private var _provider: ActorRefProvider = _ + def provider = _provider + + private[akka] def init(provider: ActorRefProvider, path: ActorPath) { _path = path - brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher) + _provider = provider } override def isTerminated(): Boolean = true @@ -426,16 +440,25 @@ class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRe * This special dead letter reference has a name: it is that which is returned * by a local look-up which is unsuccessful. */ -class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) - extends DeadLetterActorRefLike { - init(_dispatcher, _path) +class EmptyLocalActorRef( + val eventStream: EventStream, + _provider: ActorRefProvider, + _dispatcher: MessageDispatcher, + _path: ActorPath) extends DeadLetterActorRefLike { + + init(_provider, _path) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case d: DeadLetter ⇒ // do NOT form endless loops case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) } } -class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef { +class VirtualPathContainer( + val provider: ActorRefProvider, + val path: ActorPath, + override val getParent: InternalActorRef, + val log: LoggingAdapter) extends MinimalActorRef { private val children = new ConcurrentHashMap[String, InternalActorRef] diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 86673d259b..767309d6e0 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -324,6 +324,8 @@ class LocalActorRefProvider( val path = rootPath / "bubble-walker" + def provider: ActorRefProvider = LocalActorRefProvider.this + override def stop() = stopped switchOn { terminationFuture.complete(causeOfTermination.toLeft(())) } @@ -440,7 +442,7 @@ class LocalActorRefProvider( lazy val systemGuardian: InternalActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None) - lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log) + lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = { assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()") @@ -489,7 +491,7 @@ class LocalActorRefProvider( } else ref.getChild(path.iterator) match { case Nobody ⇒ log.debug("look-up of path sequence '{}' failed", path) - new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path) + new EmptyLocalActorRef(eventStream, system.provider, dispatcher, ref.path / path) case x ⇒ x } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 04084e2c2c..8ff4ebe373 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -377,10 +377,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) _log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass) - deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") + deadLetters.init(provider, lookupRoot.path / "deadLetters") // this starts the reaper actor and the user-configured logging subscribers, which are also actors registerOnTermination(stopScheduler()) - _locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) + _locker = new Locker(scheduler, ReaperInterval, provider, lookupRoot.path / "locker", deathWatch) loadExtensions() if (LogConfigOnStart) logConfiguration() this diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala index d4fd1badd5..ee6ea0c344 100644 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -9,7 +9,12 @@ import akka.util.duration._ import java.util.concurrent.ConcurrentHashMap import akka.event.DeathWatch -class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { +class Locker( + scheduler: Scheduler, + period: Duration, + val provider: ActorRefProvider, + val path: ActorPath, + val deathWatch: DeathWatch) extends MinimalActorRef { class DavyJones extends Runnable { def run = { diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index ccb5819074..8bc457904e 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -335,15 +335,15 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi import akka.pattern.ask MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m; null //Null return value - case m if m.returnsFuture_? ⇒ actor.?(m, timeout) + case m if m.returnsFuture_? ⇒ ask(actor, m)(timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ - val f = actor.?(m, timeout) + val f = ask(actor, m)(timeout) (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None case Some(Right(joption: AnyRef)) ⇒ joption case Some(Left(ex)) ⇒ throw ex } - case m ⇒ Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] + case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef] } } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 854ac625ab..cb8f9df29e 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -649,6 +649,7 @@ object Logging { */ class StandardOutLogger extends MinimalActorRef with StdOutLogger { val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger") + def provider: ActorRefProvider = throw new UnsupportedOperationException("StandardOutLogger does not provide") override val toString = "StandardOutLogger" override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message) } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index bc79877fc3..ca07ea5052 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -51,7 +51,7 @@ object AskSupport { * * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] */ - def ask(message: AnyRef)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + def ask(message: Any)(implicit timeout: Timeout = null): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) /** * Sends a message asynchronously and returns a [[akka.dispatch.Future]] @@ -81,20 +81,7 @@ object AskSupport { * * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] */ - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message) - - /* - * FIXME: I think this should be removed, since it introduces an “ambiguity” - * when sending Tuple2, which the compiler resolves unexpectedly to this - * method; also overloading is bad, isn’t it? - RK (ticket #1653) - */ - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The implicit parameter with the default value is just there to disambiguate it from the version that takes the - * implicit timeout - */ - def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) - + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) } /** @@ -102,6 +89,7 @@ object AskSupport { * receive the reply to an "ask" operation. */ private[akka] final class PromiseActorRef( + val provider: ActorRefProvider, val path: ActorPath, override val getParent: InternalActorRef, val result: Promise[Any], @@ -130,7 +118,7 @@ object AskSupport { def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { val path = provider.tempPath() val result = Promise[Any]()(provider.dispatcher) - val a = new PromiseActorRef(path, provider.tempContainer, result, provider.deathWatch) + val a = new PromiseActorRef(provider, path, provider.tempContainer, result, provider.deathWatch) provider.registerTempActor(a, path) val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) } result onComplete { _ ⇒ diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 6aaff00fd4..4abb7c41d7 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -3,6 +3,10 @@ */ package akka +import akka.actor._ +import akka.dispatch.{ Future, Promise } +import akka.util.{ Timeout, Duration } + /** * == Commonly Used Patterns With Akka == * @@ -38,10 +42,6 @@ package akka */ package object pattern { - import akka.actor._ - import akka.dispatch.{ Future, Promise } - import akka.util.{ Timeout, Duration } - /** * Import this implicit conversion to gain `?` and `ask` methods on * [[akka.actor.ActorRef]], which will defer to the @@ -88,12 +88,15 @@ package object pattern { * [see [[akka.dispatch.Future]] for a description of `flow`] */ def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = actorRef match { - case ref: ActorRefWithProvider ⇒ + case ref: InternalActorRef if ref.isTerminated ⇒ + actorRef.tell(message) + Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher) + case ref: InternalActorRef ⇒ val provider = ref.provider (if (timeout == null) provider.settings.ActorTimeout else timeout) match { case t if t.duration.length <= 0 ⇒ actorRef.tell(message) - Promise.failed(new AskTimeoutException("failed to create PromiseActorRef"))(provider.dispatcher) + Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher) case t ⇒ val a = AskSupport.createAsker(provider, t) actorRef.tell(message, a) @@ -132,7 +135,7 @@ package object pattern { def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = { future onComplete { case Right(r) ⇒ actorRef ! r - case Left(f) ⇒ actorRef ! akka.actor.Status.Failure(f) + case Left(f) ⇒ actorRef ! Status.Failure(f) } future } diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index 1406ad8564..1442d907d7 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -568,4 +568,3 @@ object Timeout { implicit def intToTimeout(timeout: Int) = new Timeout(timeout) implicit def longToTimeout(timeout: Long) = new Timeout(timeout) } - diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index cd19a0c781..8778d95760 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -124,7 +124,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * that new state can be obtained within the given timeout. */ def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = { - def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]] + def dispatch = ask(updater, Alter(f))(timeout).asInstanceOf[Future[T]] val txn = Txn.findCurrent if (txn.isDefined) { val result = Promise[T]()(system.dispatcher) @@ -172,7 +172,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { send((value: T) ⇒ { suspend() val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher("akka.agent.alter-off-dispatcher")) - result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]] + result completeWith ask(threadBased, Alter(f))(timeout).asInstanceOf[Future[T]] value }) result diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index bb82aab595..a753325429 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -252,7 +252,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#using-explicit-timeout import akka.util.duration._ import akka.pattern.ask - val future = myActor ? ("hello", timeout = 500 millis) + val future = myActor.ask("hello")(500 millis) //#using-explicit-timeout Await.result(future, 500 millis) must be("hello") } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index f76aa8e908..628f9aea97 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -124,7 +124,7 @@ case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMs * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. */ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) - extends VirtualPathContainer(_path, _parent, _log) { + extends VirtualPathContainer(system.provider, _path, _parent, _log) { /** * Find the longest matching path which we know about and return that ref diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a04d37e207..388007ae33 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -166,7 +166,7 @@ private[akka] class RemoteActorRef private[akka] ( val path: ActorPath, val getParent: InternalActorRef, loader: Option[ClassLoader]) - extends InternalActorRef with RemoteRef with ActorRefWithProvider { + extends InternalActorRef with RemoteRef { def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream