diff --git a/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala b/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala index 0a9238209e..e923773bd8 100644 --- a/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala +++ b/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala @@ -10,6 +10,7 @@ import akka.dispatch.OldFuture import akka.util.Duration import java.util.concurrent.TimeUnit import java.net.InetSocketAddress +import akka.migration.AskableActorRef /** * Migration replacement for `object akka.actor.Actor`. @@ -54,7 +55,6 @@ object OldActor { @deprecated("OldActor.remote should not be used", "2.0") lazy val remote: OldRemoteSupport = new OldRemoteSupport - } @deprecated("use Actor", "2.0") @@ -66,6 +66,8 @@ abstract class OldActor extends Actor { implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef) + implicit def askableActorRef(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) + @deprecated("Use context.become instead", "2.0") def become(behavior: Receive, discardOld: Boolean = true) = context.become(behavior, discardOld) diff --git a/akka-actor-migration/src/main/scala/akka/migration/AskableActorRef.scala b/akka-actor-migration/src/main/scala/akka/migration/AskableActorRef.scala new file mode 100644 index 0000000000..942d8ae47a --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/migration/AskableActorRef.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.migration + +import akka.actor.ActorRef +import akka.dispatch.Future +import akka.util.Timeout + +/** + * Implementation detail of the “ask” pattern enrichment of ActorRef + */ +private[akka] final class AskableActorRef(val actorRef: ActorRef) { + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * flow { + * val f = worker.ask(request)(timeout) + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * flow { + * val f = worker ? request + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + + /** + * This method is just there to catch 2.0-unsupported usage and print deprecation warnings for it. + */ + @deprecated("use ?(msg)(timeout), this method has dangerous ambiguity", "2.0-migration") + def ?(message: Any, timeout: Timeout)(i: Int = 0): Future[Any] = this.?(message)(timeout) +} \ No newline at end of file diff --git a/akka-actor-migration/src/main/scala/akka/migration/package.scala b/akka-actor-migration/src/main/scala/akka/migration/package.scala index 319fdd997e..4fd8d5eeea 100644 --- a/akka-actor-migration/src/main/scala/akka/migration/package.scala +++ b/akka-actor-migration/src/main/scala/akka/migration/package.scala @@ -31,4 +31,7 @@ package object migration { def stop(): Unit = GlobalActorSystem.stop(actorRef) } + implicit def ask(actorRef: ActorRef) = new akka.migration.AskableActorRef(actorRef) + def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + } \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 9fa3ef2709..c9f1725692 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -8,6 +8,7 @@ import akka.testkit._ import org.scalatest.BeforeAndAfterEach import akka.util.duration._ import akka.dispatch.Await +import akka.pattern.ask object ActorFireForgetRequestReplySpec { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 5fb9187cf7..d2c8a4bd47 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -12,6 +12,7 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ import akka.dispatch.Await +import akka.pattern.ask object ActorLifeCycleSpec { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 38695f8258..3d4f61caa1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -6,6 +6,7 @@ package akka.actor import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await +import akka.pattern.ask object ActorLookupSpec { @@ -39,11 +40,13 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val c2 = system.actorOf(p, "c2") val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration) - val user = system.asInstanceOf[ActorSystemImpl].guardian - val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian - val root = system.asInstanceOf[ActorSystemImpl].lookupRoot + val sysImpl = system.asInstanceOf[ActorSystemImpl] - def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match { + val user = sysImpl.guardian + val syst = sysImpl.systemGuardian + val root = sysImpl.lookupRoot + + def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match { case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index b4ab7f066d..1f5120decd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -15,6 +15,7 @@ import akka.util.ReflectiveAccess import akka.serialization.Serialization import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.dispatch.{ Await, DefaultPromise, Promise, Future } +import akka.pattern.ask object ActorRefSpec { @@ -287,7 +288,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val baos = new ByteArrayOutputStream(8192 * 32) val out = new ObjectOutputStream(baos) - val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address + val sysImpl = system.asInstanceOf[ActorSystemImpl] + val addr = sysImpl.provider.rootPath.address val serialized = SerializedActorRef(addr + "/non-existing") out.writeObject(serialized) @@ -295,9 +297,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { out.flush out.close - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing") + in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing") } } @@ -358,8 +360,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { } })) - val ffive = (ref ? (5, timeout)).mapTo[String] - val fnull = (ref ? (null, timeout)).mapTo[String] + val ffive = (ref.ask(5)(timeout)).mapTo[String] + val fnull = (ref.ask(null)(timeout)).mapTo[String] ref ! PoisonPill Await.result(ffive, timeout.duration) must be("five") diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index 34e073cd2f..c8df739b48 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -10,6 +10,7 @@ import akka.testkit.DefaultTimeout import java.util.concurrent.TimeoutException import akka.dispatch.Await import akka.util.Timeout +import akka.pattern.{ ask, AskTimeoutException } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { @@ -44,7 +45,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { val echo = system.actorOf(Props.empty) - val f = echo.?("hallo", testTimeout) + val f = echo.?("hallo")(testTimeout) try { intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) } } finally { system.stop(echo) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 781b227f6c..44d3daa9e2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -8,6 +8,7 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ import akka.dispatch.Await +import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index aaa19b4561..2566e5a955 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -9,6 +9,7 @@ import akka.util.duration._ import Actor._ import akka.util.Duration import akka.dispatch.Await +import akka.pattern.ask object ForwardActorSpec { val ExpectedMessage = "FOO" @@ -46,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec { "forward actor reference when invoking forward on ask" in { val chain = createForwardingChain(system) - chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } + chain.ask(ExpectedMessage)(5 seconds) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } expectMsg(5 seconds, ExpectedMessage) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index e01729d632..c20d27d8ca 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -9,6 +9,7 @@ import akka.util.duration._ import scala.util.continuations._ import akka.testkit._ import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher } +import akka.pattern.ask object IOActorSpec { diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 4b7853af95..beeb243ce8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -14,6 +14,7 @@ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import akka.testkit.TestLatch import akka.util.duration._ +import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RestartStrategySpec extends AkkaSpec with DefaultTimeout { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 3a7e91a47d..a07af9d2eb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -5,6 +5,7 @@ import akka.util.duration._ import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } import akka.testkit._ import akka.dispatch.Await +import akka.pattern.ask import java.util.concurrent.atomic.AtomicInteger @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index d0be683c94..60107b9754 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -8,6 +8,7 @@ import akka.testkit._ import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.dispatch.Await +import akka.pattern.ask object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 6e63a31d9a..de617c8db2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -8,6 +8,7 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout +import akka.pattern.ask object SupervisorMiscSpec { val config = """ diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 4a0f8e997f..6b96038000 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -11,6 +11,7 @@ import akka.testkit.TestEvent._ import akka.testkit._ import java.util.concurrent.atomic.AtomicInteger import akka.dispatch.Await +import akka.pattern.ask object SupervisorSpec { val Timeout = 5 seconds @@ -130,12 +131,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } def ping(pingPongActor: ActorRef) = { - Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage expectMsg(Timeout, PingMessage) } def kill(pingPongActor: ActorRef) = { - val result = (pingPongActor ? (DieReply, TimeoutMillis)) + val result = (pingPongActor.?(DieReply)(TimeoutMillis)) expectMsg(Timeout, ExceptionMessage) intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } } @@ -153,7 +154,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "not restart temporary actor" in { val (temporaryActor, _) = temporaryActorAllForOne - intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } + intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } expectNoMsg(1 second) } @@ -299,11 +300,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { intercept[RuntimeException] { - Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) + Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } } - Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage inits.get must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index ea97415e72..04c1292d15 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -12,6 +12,7 @@ import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.DefaultTimeout import akka.dispatch.{ Await, Dispatchers } +import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index cc4e7b2f74..38292decde 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -11,6 +11,7 @@ import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.DefaultTimeout import akka.dispatch.Await +import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 92cec0a79f..ced257dfc8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -18,6 +18,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.japi.{ Creator, Option ⇒ JOption } import akka.testkit.DefaultTimeout import akka.dispatch.{ Await, Dispatchers, Future, Promise } +import akka.pattern.ask object TypedActorSpec { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 417a174f8b..e2b697a08f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -20,6 +20,7 @@ import akka.util.duration._ import akka.event.Logging.Error import com.typesafe.config.Config import akka.util.Duration +import akka.pattern.ask object ActorModelSpec { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index d75bad30c6..2dce8346db 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -8,6 +8,7 @@ import akka.util.Duration import akka.util.duration._ import akka.testkit.DefaultTimeout import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher } +import akka.pattern.ask object DispatcherActorSpec { val config = """ diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index 6ac18f9947..6c66784e5d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -7,6 +7,7 @@ import akka.actor.{ Props, Actor } import akka.testkit.AkkaSpec import org.scalatest.BeforeAndAfterEach import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers } +import akka.pattern.ask object PinnedActorSpec { val config = """ diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index c4668db428..09fba90fc8 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -9,6 +9,7 @@ import akka.actor.future2actor import akka.util.duration._ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout +import akka.pattern.ask class Future2ActorSpec extends AkkaSpec with DefaultTimeout { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 943a24025b..71db22cd9a 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -15,6 +15,7 @@ import akka.testkit.DefaultTimeout import akka.testkit.TestLatch import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } import scala.runtime.NonLocalReturnControl +import akka.pattern.ask import java.lang.{ IllegalStateException, ArithmeticException } object FutureSpec { @@ -323,7 +324,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] } Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) } @@ -351,7 +352,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) } })) } - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(10000).mapTo[Int] } Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45) } @@ -368,7 +369,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] } intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -400,7 +401,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] } assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45) } @@ -417,7 +418,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa })) } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] } intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected" } } @@ -458,7 +459,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "shouldHandleThrowables" in { class ThrowableTest(m: String) extends Throwable(m) - filterException[ThrowableTest] { + EventFilter[ThrowableTest](occurrences = 4) intercept { val f1 = Future[Any] { throw new ThrowableTest("test") } intercept[ThrowableTest] { Await.result(f1, timeout.duration) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index ec6aab48be..2bb4ab73e5 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -2,7 +2,7 @@ package akka.dispatch import akka.actor.{ Props, LocalActorRef, Actor } import akka.testkit.AkkaSpec -import akka.util.Duration +import akka.pattern.ask import akka.util.duration._ import akka.testkit.DefaultTimeout import com.typesafe.config.Config diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala new file mode 100644 index 0000000000..ecb9690594 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.testkit.AkkaSpec +import akka.util.duration._ + +class AskSpec extends AkkaSpec { + + "The “ask” pattern" must { + + "return broken promises on DeadLetters" in { + val dead = system.actorFor("/system/deadLetters") + val f = dead.ask(42)(1 second) + f.isCompleted must be(true) + f.value.get match { + case Left(_: AskTimeoutException) ⇒ + case v ⇒ fail(v + " was not Left(AskTimeoutException)") + } + } + + "return broken promises on EmptyLocalActorRefs" in { + val empty = system.actorFor("unknown") + implicit val timeout = system.settings.ActorTimeout + val f = empty ? 3.14 + f.isCompleted must be(true) + f.value.get match { + case Left(_: AskTimeoutException) ⇒ + case v ⇒ fail(v + " was not Left(AskTimeoutException)") + } + } + + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index f213379d17..f2707e042c 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await +import akka.pattern.ask object ConfiguredLocalRoutingSpec { val config = """ diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index d2849188a8..863922491b 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -10,6 +10,7 @@ import akka.dispatch.Await import akka.util.duration._ import akka.actor.ActorRef import java.util.concurrent.atomic.AtomicInteger +import akka.pattern.ask object ResizerSpec { diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9529854314..9de51bdabf 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -12,6 +12,7 @@ import akka.dispatch.Await import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory +import akka.pattern.ask import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.Config diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index bb86afb5eb..32f2ede8bf 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -13,6 +13,7 @@ import akka.util.Timeout import akka.util.duration._ import scala.reflect.BeanInfo import com.google.protobuf.Message +import akka.pattern.ask class ProtobufSerializer extends Serializer { val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) diff --git a/akka-actor/src/main/java/com/typesafe/config/Config.java b/akka-actor/src/main/java/com/typesafe/config/Config.java index d3496c73ef..e820d1b369 100644 --- a/akka-actor/src/main/java/com/typesafe/config/Config.java +++ b/akka-actor/src/main/java/com/typesafe/config/Config.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigException.java b/akka-actor/src/main/java/com/typesafe/config/ConfigException.java index b8dcb8ca00..3c31d811c3 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigException.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigException.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java b/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java index df5e762a5c..bb3f0aa131 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigIncludeContext.java b/akka-actor/src/main/java/com/typesafe/config/ConfigIncludeContext.java index 2be0abff34..ac3644a5af 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigIncludeContext.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigIncludeContext.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigIncluder.java b/akka-actor/src/main/java/com/typesafe/config/ConfigIncluder.java index 1ac6f3383d..38e8e35a91 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigIncluder.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigIncluder.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigList.java b/akka-actor/src/main/java/com/typesafe/config/ConfigList.java index 2024efe744..5c694a508e 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigList.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigList.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigMergeable.java b/akka-actor/src/main/java/com/typesafe/config/ConfigMergeable.java index 1214db8c44..c760bf9d15 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigMergeable.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigMergeable.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigObject.java b/akka-actor/src/main/java/com/typesafe/config/ConfigObject.java index bb4d14da89..16b9837f6d 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigObject.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigObject.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigOrigin.java b/akka-actor/src/main/java/com/typesafe/config/ConfigOrigin.java index fbc1fe17c3..c34767fb2e 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigOrigin.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigOrigin.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigParseOptions.java b/akka-actor/src/main/java/com/typesafe/config/ConfigParseOptions.java index f3765a5479..2d057e812f 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigParseOptions.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigParseOptions.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigParseable.java b/akka-actor/src/main/java/com/typesafe/config/ConfigParseable.java index 1cc39614ca..8c19085513 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigParseable.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigParseable.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigResolveOptions.java b/akka-actor/src/main/java/com/typesafe/config/ConfigResolveOptions.java index 3adb589f1d..d82a6be71a 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigResolveOptions.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigResolveOptions.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigSyntax.java b/akka-actor/src/main/java/com/typesafe/config/ConfigSyntax.java index 7626a92e6d..54529fad0b 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigSyntax.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigSyntax.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java b/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java index b636c6f4cd..1f389be08f 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigValueFactory.java b/akka-actor/src/main/java/com/typesafe/config/ConfigValueFactory.java index 14c2bff8f7..babace186e 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigValueFactory.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigValueFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigValueType.java b/akka-actor/src/main/java/com/typesafe/config/ConfigValueType.java index 981cb6d189..a15774d3ce 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigValueType.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigValueType.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigObject.java b/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigObject.java index a21ccd81f7..1eb31c6397 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigObject.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigObject.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigValue.java b/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigValue.java index e51f4c6067..46063f410e 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigValue.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/AbstractConfigValue.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigBoolean.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigBoolean.java index c926c0c942..a1a7677c78 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigBoolean.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigBoolean.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMerge.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMerge.java index 4cca7834bd..218aaca306 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMerge.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMerge.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMergeObject.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMergeObject.java index fe970d59c8..a9ff7bd3bf 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMergeObject.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDelayedMergeObject.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDouble.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDouble.java index c26d3cd6a9..65cfb5d966 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDouble.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigDouble.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java index 217f4385e9..73ddfdce5a 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImplUtil.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImplUtil.java index cbc0ecca09..4cbcff7aa3 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImplUtil.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImplUtil.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigInt.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigInt.java index 440b5ae8cf..d33bd21274 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigInt.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigInt.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigLong.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigLong.java index 6a72bc4cab..a2e2ab3228 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigLong.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigLong.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNull.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNull.java index fbdc21d7a5..91b8586674 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNull.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNull.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNumber.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNumber.java index 4a6bbd0b15..d2f4bb6ac4 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNumber.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigNumber.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigString.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigString.java index 9b41e7f7ab..f5293c8fc7 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigString.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigString.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java index f4441b81a5..23776e0011 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigSubstitution.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/DefaultTransformer.java b/akka-actor/src/main/java/com/typesafe/config/impl/DefaultTransformer.java index 4391814acb..9a9bf5c6a9 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/DefaultTransformer.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/DefaultTransformer.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/FromMapMode.java b/akka-actor/src/main/java/com/typesafe/config/impl/FromMapMode.java index ce6c3e3f0a..764a45664c 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/FromMapMode.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/FromMapMode.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Parseable.java b/akka-actor/src/main/java/com/typesafe/config/impl/Parseable.java index 62b8ee575a..4938603199 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Parseable.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Parseable.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Parser.java b/akka-actor/src/main/java/com/typesafe/config/impl/Parser.java index 5df0314fe6..1ba8535207 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Parser.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Parser.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Path.java b/akka-actor/src/main/java/com/typesafe/config/impl/Path.java index 193d930002..af6dfe8bf5 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Path.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Path.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/PathBuilder.java b/akka-actor/src/main/java/com/typesafe/config/impl/PathBuilder.java index f46e78201e..ede6c66387 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/PathBuilder.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/PathBuilder.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/PropertiesParser.java b/akka-actor/src/main/java/com/typesafe/config/impl/PropertiesParser.java index bd822e65e9..7c8c81fb07 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/PropertiesParser.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/PropertiesParser.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ResolveStatus.java b/akka-actor/src/main/java/com/typesafe/config/impl/ResolveStatus.java index 3f73eb5221..8deeaf520f 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ResolveStatus.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ResolveStatus.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java index 17979ba6cc..a87b12316a 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigList.java b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigList.java index 1921826352..fa91ea6627 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigList.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigList.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigObject.java b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigObject.java index 953f26491f..c961ae6b34 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigObject.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigObject.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java index f0a0dbd353..4f8859f050 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SubstitutionResolver.java b/akka-actor/src/main/java/com/typesafe/config/impl/SubstitutionResolver.java index 65a2b30900..7bb3bf3a61 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SubstitutionResolver.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SubstitutionResolver.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Token.java b/akka-actor/src/main/java/com/typesafe/config/impl/Token.java index afff3247d6..5f16d26e1d 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Token.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Token.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/TokenType.java b/akka-actor/src/main/java/com/typesafe/config/impl/TokenType.java index fc617d9ee2..7853c09445 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/TokenType.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/TokenType.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Tokenizer.java b/akka-actor/src/main/java/com/typesafe/config/impl/Tokenizer.java index 280a028077..2fcee8e61a 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Tokenizer.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Tokenizer.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Tokens.java b/akka-actor/src/main/java/com/typesafe/config/impl/Tokens.java index d726d83d53..83bec62af3 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Tokens.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Tokens.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/Unmergeable.java b/akka-actor/src/main/java/com/typesafe/config/impl/Unmergeable.java index e0d114e78d..0028f2e023 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/Unmergeable.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/Unmergeable.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe Inc. + * Copyright (C) 2011-2012 Typesafe Inc. */ package com.typesafe.config.impl; diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c623e438de..5ccce6906c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -104,33 +104,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender) - /** - * Akka Java API. - * - * Sends a message asynchronously returns a future holding the eventual reply message. - * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given - * timeout has expired. - * - * NOTE: - * Use this method with care. In most cases it is better to use 'tell' together with the sender - * parameter to implement non-blocking request/response message exchanges. - * - * If you are sending messages using ask and using blocking operations on the Future, such as - * 'get', then you have to use getContext().sender().tell(...) - * in the target actor to send a reply message to the original sender, and thereby completing the Future, - * otherwise the sender will block until the timeout expires. - * - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s reference, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - */ - def ask(message: Any, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]] - - def ask(message: Any, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis)) - /** * Forwards the message and passes the original sender actor as the sender. *

@@ -179,35 +152,6 @@ trait ScalaActorRef { ref: ActorRef ⇒ */ def !(message: Any)(implicit sender: ActorRef = null): Unit - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given - * timeout has expired. - * - * NOTE: - * Use this method with care. In most cases it is better to use '!' together with implicit or explicit - * sender parameter to implement non-blocking request/response message exchanges. - * - * If you are sending messages using ask and using blocking operations on the Future, such as - * 'get', then you have to use getContext().sender().tell(...) - * in the target actor to send a reply message to the original sender, and thereby completing the Future, - * otherwise the sender will block until the timeout expires. - * - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s reference, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - */ - def ?(message: Any)(implicit timeout: Timeout): Future[Any] - - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The implicit parameter with the default value is just there to disambiguate it from the version that takes the - * implicit timeout - */ - def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) } /** @@ -230,12 +174,25 @@ trait LocalRef extends ActorRefScope { * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA! */ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒ + /* + * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). + */ def resume(): Unit def suspend(): Unit def restart(cause: Throwable): Unit def stop(): Unit def sendSystemMessage(message: SystemMessage): Unit + + /** + * Get a reference to the actor ref provider which created this ref. + */ + def provider: ActorRefProvider + + /** + * Obtain parent of this ref; used by getChild for ".." paths. + */ def getParent: InternalActorRef + /** * Obtain ActorRef by possibly traversing the actor tree or looking it up at * some provider-specific location. This method shall return the end result, @@ -245,6 +202,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe * exist, return Nobody. */ def getChild(name: Iterator[String]): InternalActorRef + /** * Scope: if this ref points to an actor which resides within the same JVM, * i.e. whose mailbox is directly reachable etc. @@ -252,8 +210,12 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def isLocal: Boolean } +/** + * This is an internal look-up failure token, not useful for anything else. + */ private[akka] case object Nobody extends MinimalActorRef { val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody") + def provider = throw new UnsupportedOperationException("Nobody does not provide") } /** @@ -321,6 +283,8 @@ private[akka] class LocalActorRef private[akka] ( def getParent: InternalActorRef = actorCell.parent + def provider = actorCell.provider + /** * Method for looking up a single child beneath this actor. Override in order * to inject “synthetic” actor paths like “/temp”. @@ -365,17 +329,6 @@ private[akka] class LocalActorRef private[akka] ( def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - actorCell.provider.ask(timeout) match { - case Some(a) ⇒ - this.!(message)(a) - a.result - case None ⇒ - this.!(message)(null) - Promise[Any]()(actorCell.system.dispatcher) - } - } - def restart(cause: Throwable): Unit = actorCell.restart(cause) @throws(classOf[java.io.ObjectStreamException]) @@ -402,9 +355,10 @@ case class SerializedActorRef(path: String) { /** * Trait for ActorRef implementations where all methods contain default stubs. */ -trait MinimalActorRef extends InternalActorRef with LocalRef { +private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { def getParent: InternalActorRef = Nobody + def getChild(names: Iterator[String]): InternalActorRef = { val dropped = names.dropWhile(_.isEmpty) if (dropped.isEmpty) this @@ -420,9 +374,6 @@ trait MinimalActorRef extends InternalActorRef with LocalRef { def !(message: Any)(implicit sender: ActorRef = null): Unit = () - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = - throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName)) - def sendSystemMessage(message: SystemMessage): Unit = () def restart(cause: Throwable): Unit = () @@ -430,9 +381,10 @@ trait MinimalActorRef extends InternalActorRef with LocalRef { protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) } -object MinimalActorRef { - def apply(_path: ActorPath)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef { +private[akka] object MinimalActorRef { + def apply(_path: ActorPath, _provider: ActorRefProvider)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef { def path = _path + def provider = _provider override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (receive.isDefinedAt(message)) receive(message) } @@ -440,7 +392,7 @@ object MinimalActorRef { case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) -object DeadLetterActorRef { +private[akka] object DeadLetterActorRef { class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? @throws(classOf[java.io.ObjectStreamException]) private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters @@ -449,12 +401,10 @@ object DeadLetterActorRef { val serialized = new SerializedDeadLetterActorRef } -trait DeadLetterActorRefLike extends MinimalActorRef { +private[akka] trait DeadLetterActorRefLike extends MinimalActorRef { def eventStream: EventStream - @volatile - private var brokenPromise: Future[Any] = _ @volatile private var _path: ActorPath = _ def path: ActorPath = { @@ -462,9 +412,13 @@ trait DeadLetterActorRefLike extends MinimalActorRef { _path } - private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) { + @volatile + private var _provider: ActorRefProvider = _ + def provider = _provider + + private[akka] def init(provider: ActorRefProvider, path: ActorPath) { _path = path - brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher) + _provider = provider } override def isTerminated(): Boolean = true @@ -473,16 +427,9 @@ trait DeadLetterActorRefLike extends MinimalActorRef { case d: DeadLetter ⇒ eventStream.publish(d) case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) } - - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - eventStream.publish(DeadLetter(message, this, this)) - // leave this in: guard with good visibility against really stupid/weird errors - assert(brokenPromise != null) - brokenPromise - } } -class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike { +private[akka] class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike { @throws(classOf[java.io.ObjectStreamException]) override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized } @@ -491,16 +438,28 @@ class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRe * This special dead letter reference has a name: it is that which is returned * by a local look-up which is unsuccessful. */ -class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) - extends DeadLetterActorRefLike { - init(_dispatcher, _path) +private[akka] class EmptyLocalActorRef( + val eventStream: EventStream, + _provider: ActorRefProvider, + _dispatcher: MessageDispatcher, + _path: ActorPath) extends DeadLetterActorRefLike { + + init(_provider, _path) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case d: DeadLetter ⇒ // do NOT form endless loops case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) } } -class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef { +/** + * Internal implementation detail used for paths like “/temp” + */ +private[akka] class VirtualPathContainer( + val provider: ActorRefProvider, + val path: ActorPath, + override val getParent: InternalActorRef, + val log: LoggingAdapter) extends MinimalActorRef { private val children = new ConcurrentHashMap[String, InternalActorRef] @@ -534,41 +493,3 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal } } } - -/** - * This is what is used to complete a Future that is returned from an ask/? call, - * when it times out. - */ -class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { - def this(message: String) = this(message, null: Throwable) -} - -class AskActorRef( - val path: ActorPath, - override val getParent: InternalActorRef, - val dispatcher: MessageDispatcher, - val deathWatch: DeathWatch) extends MinimalActorRef { - - final val running = new AtomicBoolean(true) - final val result = Promise[Any]()(dispatcher) - - override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match { - case Status.Success(r) ⇒ result.success(r) - case Status.Failure(f) ⇒ result.failure(f) - case other ⇒ result.success(other) - } - - override def sendSystemMessage(message: SystemMessage): Unit = message match { - case _: Terminate ⇒ stop() - case _ ⇒ - } - - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = - Promise.failed(new UnsupportedOperationException("Ask/? is not supported for [%s]".format(getClass.getName)))(dispatcher) - - override def isTerminated = result.isCompleted - - override def stop(): Unit = if (running.getAndSet(false)) { - deathWatch.publish(Terminated(this)) - } -} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala old mode 100644 new mode 100755 index 49f241d0c6..db7ca09664 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -47,6 +47,8 @@ trait ActorRefProvider { def settings: ActorSystem.Settings + def dispatcher: MessageDispatcher + /** * Initialization of an ActorRefProvider happens in two steps: first * construction of the object with settings, eventStream, scheduler, etc. @@ -59,6 +61,26 @@ trait ActorRefProvider { def scheduler: Scheduler + /** + * Generates and returns a unique actor path below “/temp”. + */ + def tempPath(): ActorPath + + /** + * Returns the actor reference representing the “/temp” path. + */ + def tempContainer: InternalActorRef + + /** + * Registers an actorRef at a path returned by tempPath(); do NOT pass in any other path. + */ + def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit + + /** + * Unregister a temporary actor from the “/temp” path (i.e. obtained from tempPath()); do NOT pass in any other path. + */ + def unregisterTempActor(path: ActorPath): Unit + /** * Actor factory with create-only semantics: will create an actor as * described by props with the given supervisor and path (may be different @@ -89,12 +111,6 @@ trait ActorRefProvider { */ def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef - /** - * Create AskActorRef and register it properly so it can be serialized/deserialized; - * caller needs to send the message. - */ - def ask(within: Timeout): Option[AskActorRef] - /** * This Future is completed upon termination of this ActorRefProvider, which * is usually initiated by stopping the guardian via ActorSystem.stop(). @@ -308,6 +324,8 @@ class LocalActorRefProvider( val path = rootPath / "bubble-walker" + def provider: ActorRefProvider = LocalActorRefProvider.this + override def stop() = stopped switchOn { terminationFuture.complete(causeOfTermination.toLeft(())) } @@ -426,7 +444,17 @@ class LocalActorRefProvider( lazy val systemGuardian: InternalActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None) - lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log) + lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) + + def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = { + assert(path.parent eq tempNode, "cannot registerTempActor() with anything not obtained from tempPath()") + tempContainer.addChild(path.name, actorRef) + } + + def unregisterTempActor(path: ActorPath): Unit = { + assert(path.parent eq tempNode, "cannot unregisterTempActor() with anything not obtained from tempPath()") + tempContainer.removeChild(path.name) + } val deathWatch = new LocalDeathWatch(1024) //TODO make configrable @@ -465,7 +493,7 @@ class LocalActorRefProvider( } else ref.getChild(path.iterator) match { case Nobody ⇒ log.debug("look-up of path sequence '{}' failed", path) - new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path) + new EmptyLocalActorRef(eventStream, system.provider, dispatcher, ref.path / path) case x ⇒ x } @@ -480,25 +508,6 @@ class LocalActorRefProvider( new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path) } } - - def ask(within: Timeout): Option[AskActorRef] = { - (if (within == null) settings.ActorTimeout else within) match { - case t if t.duration.length <= 0 ⇒ None - case t ⇒ - val path = tempPath() - val name = path.name - val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch) - tempContainer.addChild(name, a) - val result = a.result - val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } - result onComplete { _ ⇒ - try { a.stop(); f.cancel() } - finally { tempContainer.removeChild(name) } - } - - Some(a) - } - } } class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1f4bbbf1b3..b77aac491f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -6,6 +6,7 @@ package akka.actor import akka.config.ConfigurationException import akka.event._ import akka.dispatch._ +import akka.pattern.ask import akka.util.duration._ import akka.util.Timeout._ import org.jboss.netty.akka.util.HashedWheelTimer @@ -396,10 +397,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) _log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass) - deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") + deadLetters.init(provider, lookupRoot.path / "deadLetters") registerOnTermination(stopScheduler()) // this starts the reaper actor and the user-configured logging subscribers, which are also actors - _locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) + _locker = new Locker(scheduler, ReaperInterval, provider, lookupRoot.path / "locker", deathWatch) loadExtensions() if (LogConfigOnStart) logConfiguration() this diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala index 0aef9e6850..9e34f02332 100644 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -9,7 +9,15 @@ import akka.util.duration._ import java.util.concurrent.ConcurrentHashMap import akka.event.DeathWatch -class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { +/** + * Internal implementation detail for disposing of orphaned actors. + */ +private[akka] class Locker( + scheduler: Scheduler, + period: Duration, + val provider: ActorRefProvider, + val path: ActorPath, + val deathWatch: DeathWatch) extends MinimalActorRef { class DavyJones extends Runnable { def run = { diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index e89523b6bb..a2451173b3 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -32,8 +32,8 @@ trait Scheduler { /** * Schedules a message to be sent repeatedly with an initial delay and * frequency. E.g. if you would like a message to be sent immediately and - * thereafter every 500ms you would set delay = Duration.Zero and frequency - * = Duration(500, TimeUnit.MILLISECONDS) + * thereafter every 500ms you would set delay=Duration.Zero and + * frequency=Duration(500, TimeUnit.MILLISECONDS) * * Java & Scala API */ @@ -260,4 +260,4 @@ class DefaultCancellable(val timeout: HWTimeout) extends Cancellable { def isCancelled: Boolean = { timeout.isCancelled } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 3bafe9ba4e..83a7c0aa62 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -348,17 +348,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ + import akka.pattern.ask MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m; null //Null return value - case m if m.returnsFuture_? ⇒ actor.?(m, timeout) + case m if m.returnsFuture_? ⇒ ask(actor, m)(timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ - val f = actor.?(m, timeout) + val f = ask(actor, m)(timeout) (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None case Some(Right(joption: AnyRef)) ⇒ joption case Some(Left(ex)) ⇒ throw ex } - case m ⇒ Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] + case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef] } } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index a5b4f6923c..f75e47f9a4 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -155,6 +155,7 @@ trait LoggingBus extends ActorEventBus { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) implicit val timeout = Timeout(3 seconds) + import akka.pattern.ask val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) @@ -648,6 +649,7 @@ object Logging { */ class StandardOutLogger extends MinimalActorRef with StdOutLogger { val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger") + def provider: ActorRefProvider = throw new UnsupportedOperationException("StandardOutLogger does not provide") override val toString = "StandardOutLogger" override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message) } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala new file mode 100644 index 0000000000..3e637fc81d --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeoutException +import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath } +import akka.dispatch.{ Promise, Terminate, SystemMessage, Future } +import akka.event.DeathWatch +import akka.actor.ActorRefProvider +import akka.util.Timeout + +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + +/** + * This object contains implementation details of the “ask” pattern. + */ +object AskSupport { + + /** + * Implementation detail of the “ask” pattern enrichment of ActorRef + */ + private[akka] final class AskableActorRef(val actorRef: ActorRef) { + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * flow { + * val f = worker.ask(request)(timeout) + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * flow { + * val f = worker ? request + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + */ + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) + } + + /** + * Akka private optimized representation of the temporary actor spawned to + * receive the reply to an "ask" operation. + */ + private[akka] final class PromiseActorRef( + val provider: ActorRefProvider, + val path: ActorPath, + override val getParent: InternalActorRef, + val result: Promise[Any], + val deathWatch: DeathWatch) extends MinimalActorRef { + + final val running = new AtomicBoolean(true) + + override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match { + case Status.Success(r) ⇒ result.success(r) + case Status.Failure(f) ⇒ result.failure(f) + case other ⇒ result.success(other) + } + + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case _: Terminate ⇒ stop() + case _ ⇒ + } + + override def isTerminated = result.isCompleted + + override def stop(): Unit = if (running.getAndSet(false)) { + deathWatch.publish(Terminated(this)) + } + } + + def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { + val path = provider.tempPath() + val result = Promise[Any]()(provider.dispatcher) + val a = new PromiseActorRef(provider, path, provider.tempContainer, result, provider.deathWatch) + provider.registerTempActor(a, path) + val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) } + result onComplete { _ ⇒ + try { a.stop(); f.cancel() } + finally { provider.unregisterTempActor(path) } + } + a + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index abf435edc5..c3510d9b68 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -3,16 +3,90 @@ */ package akka.pattern -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.dispatch.Future -import akka.util.Duration - -/** - * Patterns is the Java API for the Akka patterns that provide solutions - * to commonly occurring problems. - */ object Patterns { + import akka.actor.{ ActorRef, ActorSystem } + import akka.dispatch.Future + import akka.pattern.{ ask ⇒ scalaAsk } + import akka.util.{ Timeout, Duration } + + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(worker, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]] + + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final Future f = Patterns.ask(worker, request, timeout); + * f.onSuccess(new Procedure() { + * public void apply(Object o) { + * nextActor.tell(new EnrichedResult(request, o)); + * } + * }); + * }}} + */ + def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] + + /** + * Register an onComplete callback on this [[akka.dispatch.Future]] to send + * the result to the given actor reference. Returns the original Future to + * allow method chaining. + * + * Recommended usage example: + * + * {{{ + * final Future f = Patterns.ask(worker, request, timeout); + * // apply some transformation (i.e. enrich with request info) + * final Future transformed = f.map(new akka.japi.Function() { ... }); + * // send it on to the next stage + * Patterns.pipeTo(transformed, nextActor); + * }}} + */ + def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef) /** * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when @@ -27,4 +101,4 @@ object Patterns { def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = { akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala new file mode 100644 index 0000000000..26f3b68e38 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.actor.ActorRef +import akka.dispatch.Future + +object PipeToSupport { + + class PipeableFuture[T](val future: Future[T]) { + def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index b09ee56897..ac8fcf2df2 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -3,23 +3,142 @@ */ package akka -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ActorTimeoutException -import akka.actor.PoisonPill -import akka.actor.Props -import akka.actor.ReceiveTimeout -import akka.actor.Terminated -import akka.dispatch.Future -import akka.dispatch.Promise -import akka.util.Duration +import akka.actor._ +import akka.dispatch.{ Future, Promise } +import akka.util.{ Timeout, Duration } /** - * Akka patterns that provide solutions to commonly occurring problems. + * == Commonly Used Patterns With Akka == + * + * This package is used as a collection point for usage patterns which involve + * actors, futures, etc. but are loosely enough coupled to (multiple of) them + * to present them separately from the core implementation. Currently supported + * are: + * + *
    + *
  • ask: create a temporary one-off actor for receiving a reply to a + * message and complete a [[akka.dispatch.Future]] with it; returns said + * Future.
  • + *
  • pipeTo: feed eventually computed value of a future to an actor as + * a message.
  • + *
+ * + * In Scala the recommended usage is to import the pattern from the package + * object: + * {{{ + * import akka.pattern.ask + * + * ask(actor, message) // use it directly + * actor ask message // use it by implicit conversion + * }}} + * + * For Java the patterns are available as static methods of the [[akka.pattern.Patterns]] + * class: + * {{{ + * import static akka.pattern.Patterns.ask; + * + * ask(actor, message); + * }}} */ package object pattern { + /** + * Import this implicit conversion to gain `?` and `ask` methods on + * [[akka.actor.ActorRef]], which will defer to the + * `ask(actorRef, message)(timeout)` method defined here. + * + * {{{ + * import akka.pattern.ask + * + * val future = actor ? message // => ask(actor, message) + * val future = actor ask message // => ask(actor, message) + * val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout) + * }}} + * + * All of the above use an implicit [[akka.actor.Timeout]]. + */ + implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { + case ref: InternalActorRef if ref.isTerminated ⇒ + actorRef.tell(message) + Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher) + case ref: InternalActorRef ⇒ + val provider = ref.provider + if (timeout.duration.length <= 0) { + actorRef.tell(message) + Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher) + } else { + val a = AskSupport.createAsker(provider, timeout) + actorRef.tell(message, a) + a.result + } + case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) + } + + /** + * Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]: + * + * {{{ + * import akka.pattern.pipeTo + * + * Future { doExpensiveCalc() } pipeTo nextActor + * }}} + */ + implicit def pipeTo[T](future: Future[T]): PipeToSupport.PipeableFuture[T] = new PipeToSupport.PipeableFuture(future) + + /** + * Register an onComplete callback on this [[akka.dispatch.Future]] to send + * the result to the given actor reference. Returns the original Future to + * allow method chaining. + * + * Recommended usage example: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = { + future onComplete { + case Right(r) ⇒ actorRef ! r + case Left(f) ⇒ actorRef ! Status.Failure(f) + } + future + } + /** * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 7fdbd1db5a..c6fb245d6a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,7 +4,7 @@ package akka.routing import akka.actor._ -import akka.dispatch.Future +import akka.dispatch.{ Future, Promise } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.TimeUnit @@ -12,6 +12,7 @@ import akka.util.{ Duration, Timeout } import akka.util.duration._ import com.typesafe.config.Config import akka.config.ConfigurationException +import akka.pattern.AskSupport import scala.collection.JavaConversions.iterableAsScalaIterable /** @@ -95,11 +96,6 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup } } - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - resize() - super.?(message)(timeout) - } - def resize() { for (r ← _props.routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) { @@ -699,10 +695,7 @@ trait BroadcastLike { this: RouterConfig ⇒ routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) { - case (sender, message) ⇒ - message match { - case _ ⇒ toAll(sender, routeeProvider.routees) - } + case (sender, message) ⇒ toAll(sender, routeeProvider.routees) } } } @@ -774,12 +767,10 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ - // FIXME avoid this cast - val asker = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get + val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider + val asker = AskSupport.createAsker(provider, within) asker.result.pipeTo(sender) - message match { - case _ ⇒ toAll(asker, routeeProvider.routees) - } + toAll(asker, routeeProvider.routees) } } } diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index 0442619c45..c75fa132ae 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -568,4 +568,3 @@ object Timeout { implicit def intToTimeout(timeout: Int) = new Timeout(timeout) implicit def longToTimeout(timeout: Long) = new Timeout(timeout) } - diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index 94f85268a3..af551d00c8 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -7,6 +7,7 @@ package akka.agent import akka.actor._ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } import akka.dispatch._ +import akka.pattern.ask import akka.util.Timeout import scala.concurrent.stm._ @@ -123,7 +124,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * that new state can be obtained within the given timeout. */ def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = { - def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]] + def dispatch = ask(updater, Alter(f))(timeout).asInstanceOf[Future[T]] val txn = Txn.findCurrent if (txn.isDefined) { val result = Promise[T]()(system.dispatcher) @@ -171,7 +172,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { send((value: T) ⇒ { suspend() val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher("akka.agent.alter-off-dispatcher")) - result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]] + result completeWith ask(threadBased, Alter(f))(timeout).asInstanceOf[Future[T]] value }) result diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java index 2ed0c3d715..e33644409d 100644 --- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -13,6 +13,7 @@ import akka.actor.Props; import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.dispatch.Await; +import static akka.pattern.Patterns.ask; import akka.util.Duration; import akka.testkit.AkkaSpec; import akka.testkit.TestProbe; @@ -160,19 +161,19 @@ public class FaultHandlingTestBase { //#create Props superprops = new Props(Supervisor.class); ActorRef supervisor = system.actorOf(superprops, "supervisor"); - ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + ActorRef child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout); //#create //#resume child.tell(42); - assert Await.result(child.ask("get", 5000), timeout).equals(42); + assert Await.result(ask(child, "get", 5000), timeout).equals(42); child.tell(new ArithmeticException()); - assert Await.result(child.ask("get", 5000), timeout).equals(42); + assert Await.result(ask(child, "get", 5000), timeout).equals(42); //#resume //#restart child.tell(new NullPointerException()); - assert Await.result(child.ask("get", 5000), timeout).equals(0); + assert Await.result(ask(child, "get", 5000), timeout).equals(0); //#restart //#stop @@ -183,9 +184,9 @@ public class FaultHandlingTestBase { //#stop //#escalate-kill - child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout); probe.watch(child); - assert Await.result(child.ask("get", 5000), timeout).equals(0); + assert Await.result(ask(child, "get", 5000), timeout).equals(0); child.tell(new Exception()); probe.expectMsg(new Terminated(child)); //#escalate-kill @@ -193,11 +194,11 @@ public class FaultHandlingTestBase { //#escalate-restart superprops = new Props(Supervisor2.class); supervisor = system.actorOf(superprops, "supervisor2"); - child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout); child.tell(23); - assert Await.result(child.ask("get", 5000), timeout).equals(23); + assert Await.result(ask(child, "get", 5000), timeout).equals(23); child.tell(new Exception()); - assert Await.result(child.ask("get", 5000), timeout).equals(0); + assert Await.result(ask(child, "get", 5000), timeout).equals(0); //#escalate-restart //#testkit } diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index b896f3691e..a72c828862 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -11,6 +11,7 @@ import akka.actor.Props; //#import-future import akka.dispatch.Future; +import akka.dispatch.Futures; import akka.dispatch.Await; import akka.util.Duration; import akka.util.Timeout; @@ -36,6 +37,17 @@ import akka.util.Duration; import akka.actor.ActorTimeoutException; //#import-gracefulStop +//#import-askPipeTo +import static akka.pattern.Patterns.ask; +import static akka.pattern.Patterns.pipeTo; +import akka.dispatch.Future; +import akka.dispatch.Futures; +import akka.util.Duration; +import akka.util.Timeout; +import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +//#import-askPipeTo + import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -44,7 +56,10 @@ import akka.dispatch.MessageDispatcher; import org.junit.Test; import scala.Option; import java.lang.Object; +import java.util.ArrayList; +import java.util.Iterator; import java.util.concurrent.TimeUnit; +import akka.pattern.Patterns; import static org.junit.Assert.*; @@ -123,7 +138,7 @@ public class UntypedActorDocTestBase { }), "myactor"); //#using-ask - Future future = myActor.ask("Hello", 1000); + Future future = Patterns.ask(myActor, "Hello", 1000); Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); //#using-ask system.shutdown(); @@ -175,7 +190,7 @@ public class UntypedActorDocTestBase { public void useWatch() { ActorSystem system = ActorSystem.create("MySystem"); ActorRef myActor = system.actorOf(new Props(WatchActor.class)); - Future future = myActor.ask("kill", 1000); + Future future = Patterns.ask(myActor, "kill", 1000); assert Await.result(future, Duration.parse("1 second")).equals("finished"); system.shutdown(); } @@ -196,6 +211,43 @@ public class UntypedActorDocTestBase { //#gracefulStop system.shutdown(); } + + class Result { + final int x; + final String s; + public Result(int x, String s) { + this.x = x; + this.s = s; + } + } + + @Test + public void usePatternsAskPipeTo() { + ActorSystem system = ActorSystem.create("MySystem"); + ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class)); + ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class)); + ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class)); + //#ask-pipeTo + final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + + final ArrayList> futures = new ArrayList>(); + futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout + futures.add(ask(actorB, "reqeest", t)); // using timeout from above + + final Future> aggregate = Futures.sequence(futures, system.dispatcher()); + + final Future transformed = aggregate.map(new akka.japi.Function, Result>() { + public Result apply(Iterable coll) { + final Iterator it = coll.iterator(); + final String s = (String) it.next(); + final int x = (Integer) it.next(); + return new Result(x, s); + } + }); + + pipeTo(transformed, actorC); + //#ask-pipeTo + } public static class MyActor extends UntypedActor { diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 3484c337e0..d2adfaf5fa 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -53,9 +53,9 @@ import akka.actor.Status.Failure; import akka.actor.ActorSystem; import akka.actor.UntypedActor; import akka.actor.ActorRef; -import akka.docs.actor.MyUntypedActor; import akka.actor.Props; import akka.dispatch.Futures; +import akka.pattern.Patterns; import static org.junit.Assert.*; @@ -79,7 +79,7 @@ public class FutureDocTestBase { String msg = "hello"; //#ask-blocking Timeout timeout = system.settings().ActorTimeout(); - Future future = actor.ask(msg, timeout); + Future future = Patterns.ask(actor, msg, timeout); String result = (String) Await.result(future, timeout.duration()); //#ask-blocking assertEquals("HELLO", result); diff --git a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java index 3668bc1030..2a2e5c7f22 100644 --- a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java @@ -19,6 +19,7 @@ import akka.dispatch.Await; import akka.dispatch.Future; import akka.testkit.AkkaSpec; import com.typesafe.config.ConfigFactory; +import static akka.pattern.Patterns.ask; import static akka.docs.jrouting.CustomRouterDocTestBase.DemocratActor; import static akka.docs.jrouting.CustomRouterDocTestBase.RepublicanActor; @@ -48,8 +49,8 @@ public class CustomRouterDocTestBase { routedActor.tell(DemocratVote); routedActor.tell(RepublicanVote); Timeout timeout = new Timeout(Duration.parse("1 seconds")); - Future democratsResult = routedActor.ask(DemocratCountResult, timeout); - Future republicansResult = routedActor.ask(RepublicanCountResult, timeout); + Future democratsResult = ask(routedActor, DemocratCountResult, timeout); + Future republicansResult = ask(routedActor, RepublicanCountResult, timeout); assertEquals(3, Await.result(democratsResult, timeout.duration())); assertEquals(2, Await.result(republicansResult, timeout.duration())); diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index 2125ae35a8..1119559489 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -55,8 +55,8 @@ public class ParentActor extends UntypedActor { new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration .parse("2 seconds"))), "router"); Timeout timeout = getContext().system().settings().ActorTimeout(); - Future futureResult = scatterGatherFirstCompletedRouter.ask(new FibonacciActor.FibonacciNumber(10), - timeout); + Future futureResult = akka.pattern.Patterns.ask( + scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout); int result = (Integer) Await.result(futureResult, timeout.duration()); //#scatterGatherFirstCompletedRouter System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result)); diff --git a/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java b/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java index 0b1662aaab..f00713a007 100644 --- a/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java +++ b/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java @@ -10,6 +10,7 @@ import org.junit.Test; //#imports import akka.actor.*; import akka.dispatch.Await; +import static akka.pattern.Patterns.ask; import akka.transactor.Coordinated; import akka.util.Duration; import akka.util.Timeout; @@ -30,7 +31,7 @@ public class TransactorDocTest { counter1.tell(new Coordinated(new Increment(counter2), timeout)); - Integer count = (Integer) Await.result(counter1.ask("GetCount", timeout), timeout.duration()); + Integer count = (Integer) Await.result(ask(counter1, "GetCount", timeout), timeout.duration()); //#coordinated-example assertEquals(count, new Integer(1)); @@ -71,7 +72,7 @@ public class TransactorDocTest { counter.tell(coordinated.coordinate(new Increment())); coordinated.await(); - Integer count = (Integer) Await.result(counter.ask("GetCount", timeout), timeout.duration()); + Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration()); assertEquals(count, new Integer(1)); system.shutdown(); @@ -88,10 +89,10 @@ public class TransactorDocTest { friendlyCounter.tell(coordinated.coordinate(new Increment(friend))); coordinated.await(); - Integer count1 = (Integer) Await.result(friendlyCounter.ask("GetCount", timeout), timeout.duration()); + Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration()); assertEquals(count1, new Integer(1)); - Integer count2 = (Integer) Await.result(friend.ask("GetCount", timeout), timeout.duration()); + Integer count2 = (Integer) Await.result(ask(friend, "GetCount", timeout), timeout.duration()); assertEquals(count2, new Integer(1)); system.shutdown(); diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index c4e2bf8150..d8f72a71ef 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -316,26 +316,37 @@ If invoked without the sender parameter the sender will be Ask: Send-And-Receive-Future ---------------------------- -Using ``?`` will send a message to the receiving Actor asynchronously and -will immediately return a :class:`Future` which will be completed with -an ``akka.actor.AskTimeoutException`` after the specified timeout: +The ``ask`` pattern involves actors as well as futures, hence it is offered as +a use pattern rather than a method on :class:`ActorRef`: -.. code-block:: java +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipeTo - long timeoutMillis = 1000; - Future future = actorRef.ask("Hello", timeoutMillis); +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipeTo -The receiving actor should reply to this message, which will complete the -future with the reply message as value; ``getSender.tell(result)``. +This example demonstrates ``ask`` together with the ``pipeTo`` pattern on +futures, because this is likely to be a common combination. Please note that +all of the above is completely non-blocking and asynchronous: ``ask`` produces +a :class:`Future`, two of which are composed into a new future using the +:meth:`Futures.sequence` and :meth:`map` methods and then ``pipeTo`` installs +an ``onComplete``-handler on the future to effect the submission of the +aggregated :class:`Result` to another actor. + +Using ``ask`` will send a message to the receiving Actor as with ``tell``, and +the receiving actor must reply with ``getSender().tell(reply)`` in order to +complete the returned :class:`Future` with a value. The ``ask`` operation +involves creating an internal actor for handling this reply, which needs to +have a timeout after which it is destroyed in order not to leak resources; see +more below. To complete the future with an exception you need send a Failure message to the sender. -This is not done automatically when an actor throws an exception while processing a +This is *not done automatically* when an actor throws an exception while processing a message. .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#reply-exception If the actor does not complete the future, it will expire after the timeout period, -specified as parameter to the ``ask`` method. +specified as parameter to the ``ask`` method; this will complete the +:class:`Future` with an :class:`AskTimeoutException`. See :ref:`futures-java` for more information on how to await or query a future. @@ -354,15 +365,6 @@ Gives you a way to avoid blocking. there is not yet a way to detect these illegal accesses at compile time. See also: :ref:`jmm-shared-state` -The future returned from the ``ask`` method can conveniently be passed around or -chained with further processing steps, but sometimes you just need the value, -even if that entails waiting for it (but keep in mind that waiting inside an -actor is prone to dead-locks, e.g. if obtaining the result depends on -processing another message on this actor). - -.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java - :include: import-future,using-ask - Forward message --------------- diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 6199105e0f..57a19fb393 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -22,8 +22,8 @@ anything is able to run again. Therefore we provide a migration kit that makes it possible to do the migration changes in smaller steps. The migration kit only covers the most common usage of Akka. It is not intended -as a final solution. The whole migration kit is deprecated and will be removed in -Akka 2.1. +as a final solution. The whole migration kit is marked as deprecated and will +be removed in Akka 2.1. The migration kit is provided in separate jar files. Add the following dependency:: @@ -136,7 +136,8 @@ v1.3:: v2.0:: - system.shutdown() + system.shutdown() // from outside of this system + context.system.shutdown() // from inside any actor Documentation: @@ -149,7 +150,11 @@ Identifying Actors In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``. The ``ActorRegistry`` has been replaced by actor paths and lookup with -``actorFor`` in ``ActorRefProvider`` (``ActorSystem`` or ``ActorContext``). +``actorFor`` in ``ActorRefProvider`` (``ActorSystem`` or ``ActorContext``). It +is no longer possible to obtain references to all actors being implemented by a +certain class (the reason being that this property is not known yet when an +:class:`ActorRef` is created because instantiation of the actor itself is +asynchronous). v1.3:: @@ -170,7 +175,9 @@ Reply to messages ^^^^^^^^^^^^^^^^^ ``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala) -or ``getSender()`` (Java). This works for both tell (!) and ask (?). +or ``getSender()`` (Java). This works for both tell (!) and ask (?). Sending to +an actor reference never throws an exception, hence :meth:`tryTell` and +:meth:`tryReply` are removed. v1.3:: @@ -200,11 +207,61 @@ reply to be received; it is independent of the timeout applied when awaiting completion of the :class:`Future`, however, the actor will complete the :class:`Future` with an :class:`AskTimeoutException` when it stops itself. +Since there is no good library default value for the ask-timeout, specification +of a timeout is required for all usages as shown below. + +Also, since the ``ask`` feature is coupling futures and actors, it is no longer +offered on the :class:`ActorRef` itself, but instead as a use pattern to be +imported. While Scala’s implicit conversions enable transparent replacement, +Java code will have to be changed by more than just adding an import statement. + +v1.3:: + + actorRef ? message // Scala + actorRef.ask(message, timeout); // Java + +v2.0 (Scala):: + + import akka.pattern.ask + + implicit val timeout: Timeout = ... + actorRef ? message // uses implicit timeout + actorRef ask message // uses implicit timeout + actorRef.ask(message)(timeout) // uses explicit timeout + ask(actorRef, message) // uses implicit timeout + ask(actorRef, message)(timeout) // uses explicit timeout + +v2.0 (Java):: + + import akka.pattern.Patterns; + + Patterns.ask(actorRef, message, timeout) + Documentation: * :ref:`actors-scala` * :ref:`untyped-actors-java` +``ActorRef.?(msg, timeout)`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This method has a dangerous overlap with ``ActorRef.?(msg)(implicit timeout)`` +due to the fact that Scala allows to pass a :class:`Tuple` in place of the +message without requiring extra parentheses:: + + actor ? (1, "hallo") // will send a tuple + actor ? (1, Timeout()) // will send 1 with an explicit timeout + +To remove this ambiguity, the latter variant is removed in version 2.0. If you +were using it before, it will now send tuples where that is not desired. In +order to correct all places in the code where this happens, simply import +``akka.migration.ask`` instead of ``akka.pattern.ask`` to obtain a variant +which will give deprecation warnings where the old method signature is used:: + + import akka.migration.ask + + actor ? (1, Timeout(2 seconds)) // will give deprecation warning + ActorPool ^^^^^^^^^ @@ -305,7 +362,8 @@ v2.0:: import akka.event.Logging - val log = Logging(context.system, this) + val log = Logging(context.system, this) // will include system name in message source + val log = Logging(system.eventStream, getClass.getName) // will not include system name log.error(exception, message) log.warning(message) log.info(message) @@ -485,17 +543,25 @@ Documentation: Spawn ^^^^^ -``spawn`` has been removed and can be implemented like this, if needed. Be careful to not +``spawn`` has been removed and should be replaced by creating a :class:`Future`. Be careful to not access any shared mutable state closed over by the body. -:: +Scala:: - def spawn(body: ⇒ Unit) { - system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) })) ! "go" - } + Future { doSomething() } // will be executed asynchronously + +Java:: + + Futures.future(new Callable() { + public String call() { + doSomething(); + } + }, executionContext); Documentation: + * :ref:`futures-scala` + * :ref:`futures-java` * :ref:`jmm` HotSwap @@ -505,7 +571,10 @@ In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e. The special ``HotSwap`` and ``RevertHotswap`` messages in v1.3 has been removed. Similar can be implemented with your own message and using ``context.become`` and ``context.unbecome`` -in the actor receiving the message. +in the actor receiving the message. The rationale is that being able to replace +any actor’s behavior generically is not a good idea because actor implementors +would have no way to defend against that; hence the change to lay it into the +hands of the actor itself. * :ref:`actors-scala` * :ref:`untyped-actors-java` diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index e5f12587be..b7f66c6add 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -326,9 +326,9 @@ Send messages Messages are sent to an Actor through one of the following methods. * ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return - immediately. Also know as ``tell``. + immediately. Also known as ``tell``. * ``?`` sends a message asynchronously and returns a :class:`Future` - representing a possible reply. Also know as ``ask``. + representing a possible reply. Also known as ``ask``. Message ordering is guaranteed on a per-sender basis. @@ -353,25 +353,34 @@ If invoked from an instance that is **not** an Actor the sender will be Ask: Send-And-Receive-Future ---------------------------- -Using ``?`` will send a message to the receiving Actor asynchronously and -will immediately return a :class:`Future` which will be completed with -an ``akka.actor.AskTimeoutException`` after the specified timeout: +The ``ask`` pattern involves actors as well as futures, hence it is offered as +a use pattern rather than a method on :class:`ActorRef`: -.. code-block:: scala +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#ask-pipeTo - val future = actor ? "hello" +This example demonstrates ``ask`` together with the ``pipeTo`` pattern on +futures, because this is likely to be a common combination. Please note that +all of the above is completely non-blocking and asynchronous: ``ask`` produces +a :class:`Future`, three of which are composed into a new future using the +for-comprehension and then ``pipeTo`` installs an ``onComplete``-handler on the +future to effect the submission of the aggregated :class:`Result` to another +actor. -The receiving actor should reply to this message, which will complete the -future with the reply message as value; ``sender ! result``. +Using ``ask`` will send a message to the receiving Actor as with ``tell``, and +the receiving actor must reply with ``sender ! reply`` in order to complete the +returned :class:`Future` with a value. The ``ask`` operation involves creating +an internal actor for handling this reply, which needs to have a timeout after +which it is destroyed in order not to leak resources; see more below. To complete the future with an exception you need send a Failure message to the sender. -This is not done automatically when an actor throws an exception while processing a +This is *not done automatically* when an actor throws an exception while processing a message. .. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-exception -If the actor does not complete the future, it will expire after the timeout period, -which is taken from one of the following locations in order of precedence: +If the actor does not complete the future, it will expire after the timeout +period, completing it with an :class:`AskTimeoutException`. The timeout is +taken from one of the following locations in order of precedence: 1. explicitly given timeout as in: @@ -399,23 +408,6 @@ Gives you a way to avoid blocking. there is not yet a way to detect these illegal accesses at compile time. See also: :ref:`jmm-shared-state` -The future returned from the ``?`` method can conveniently be passed around or -chained with further processing steps, but sometimes you just need the value, -even if that entails waiting for it (but keep in mind that waiting inside an -actor is prone to dead-locks, e.g. if obtaining the result depends on -processing another message on this actor). - -For this purpose, there is the method :meth:`Future.as[T]` which waits until -either the future is completed or its timeout expires, whichever comes first. -The result is then inspected and returned as :class:`Some[T]` if it was -normally completed and the answer’s runtime type matches the desired type; if -the future contains an exception or the value cannot be cast to the desired -type, it will throw the exception or a :class:`ClassCastException` (if you want -to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In -case of a timeout, :obj:`None` is returned. - -.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-ask - Forward message --------------- diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 98072cabd1..a4c903b564 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -212,31 +212,12 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.stop(myActor) } - "using ask" in { - //#using-ask - class MyActor extends Actor { - def receive = { - case x: String ⇒ sender ! x.toUpperCase - case n: Int ⇒ sender ! (n + 1) - } - } - - val myActor = system.actorOf(Props(new MyActor), name = "myactor") - implicit val timeout = system.settings.ActorTimeout - val future = myActor ? "hello" - for (x ← future) println(x) //Prints "hello" - - val result: Future[Int] = for (x ← (myActor ? 3).mapTo[Int]) yield { 2 * x } - //#using-ask - - system.stop(myActor) - } - "using implicit timeout" in { val myActor = system.actorOf(Props(new FirstActor)) //#using-implicit-timeout import akka.util.duration._ import akka.util.Timeout + import akka.pattern.ask implicit val timeout = Timeout(500 millis) val future = myActor ? "hello" //#using-implicit-timeout @@ -248,7 +229,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val myActor = system.actorOf(Props(new FirstActor)) //#using-explicit-timeout import akka.util.duration._ - val future = myActor ? ("hello", timeout = 500 millis) + import akka.pattern.ask + val future = myActor.ask("hello")(500 millis) //#using-explicit-timeout Await.result(future, 500 millis) must be("hello") } @@ -327,6 +309,28 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { case e: ActorTimeoutException ⇒ // the actor wasn't stopped within 5 seconds } //#gracefulStop - } + + "using pattern ask / pipeTo" in { + val actorA, actorB, actorC, actorD = system.actorOf(Props.empty) + //#ask-pipeTo + import akka.pattern.{ ask, pipeTo } + + case class Result(x: Int, s: String, d: Double) + case object Request + + implicit val timeout = Timeout(5 seconds) // needed for `?` below + + val f: Future[Result] = + for { + x ← ask(actorA, Request).mapTo[Int] // call pattern directly + s ← actorB ask Request mapTo manifest[String] // call by implicit conversion + d ← actorC ? Request mapTo manifest[Double] // call by symbolic name + } yield Result(x, s, d) + + f pipeTo actorD // .. or .. + pipeTo(f, actorD) + //#ask-pipeTo + } + } diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index 47a854d403..69bfe31899 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -44,9 +44,10 @@ class FutureDocSpec extends AkkaSpec { val msg = "hello" //#ask-blocking import akka.dispatch.Await + import akka.pattern.ask implicit val timeout = system.settings.ActorTimeout - val future = actor ? msg + val future = actor ? msg // enabled by the “ask” import val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking result must be("HELLO") @@ -58,8 +59,9 @@ class FutureDocSpec extends AkkaSpec { implicit val timeout = system.settings.ActorTimeout //#map-to import akka.dispatch.Future + import akka.pattern.ask - val future: Future[String] = (actor ? msg).mapTo[String] + val future: Future[String] = ask(actor, msg).mapTo[String] //#map-to Await.result(future, timeout.duration) must be("HELLO") } @@ -147,15 +149,16 @@ class FutureDocSpec extends AkkaSpec { val msg2 = 2 implicit val timeout = system.settings.ActorTimeout import akka.dispatch.Await + import akka.pattern.ask //#composing-wrong - val f1 = actor1 ? msg1 - val f2 = actor2 ? msg2 + val f1 = ask(actor1, msg1) + val f2 = ask(actor2, msg2) val a = Await.result(f1, 1 second).asInstanceOf[Int] val b = Await.result(f2, 1 second).asInstanceOf[Int] - val f3 = actor3 ? (a + b) + val f3 = ask(actor3, (a + b)) val result = Await.result(f3, 1 second).asInstanceOf[Int] //#composing-wrong @@ -170,15 +173,16 @@ class FutureDocSpec extends AkkaSpec { val msg2 = 2 implicit val timeout = system.settings.ActorTimeout import akka.dispatch.Await + import akka.pattern.ask //#composing - val f1 = actor1 ? msg1 - val f2 = actor2 ? msg2 + val f1 = ask(actor1, msg1) + val f2 = ask(actor2, msg2) val f3 = for { a ← f1.mapTo[Int] b ← f2.mapTo[Int] - c ← (actor3 ? (a + b)).mapTo[Int] + c ← ask(actor3, (a + b)).mapTo[Int] } yield c val result = Await.result(f3, 1 second).asInstanceOf[Int] @@ -191,7 +195,7 @@ class FutureDocSpec extends AkkaSpec { val oddActor = system.actorOf(Props[OddActor]) //#sequence-ask // oddActor returns odd numbers sequentially from 1 as a List[Future[Int]] - val listOfFutures = List.fill(100)((oddActor ? GetNext).mapTo[Int]) + val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int]) // now we have a Future[List[Int]] val futureList = Future.sequence(listOfFutures) @@ -239,7 +243,7 @@ class FutureDocSpec extends AkkaSpec { val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#recover - val future = actor ? msg1 recover { + val future = akka.pattern.ask(actor, msg1) recover { case e: ArithmeticException ⇒ 0 } //#recover diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index 98554a56f7..2f0a1e634c 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -8,6 +8,7 @@ import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ import akka.dispatch.Await +import akka.pattern.ask import akka.routing.SmallestMailboxRouter case class FibonacciNumber(nbr: Int) diff --git a/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala index c1815d36a0..22285851f4 100644 --- a/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala @@ -7,6 +7,7 @@ package akka.docs.testkit import akka.testkit.TestProbe import akka.util.duration._ import akka.actor._ +import akka.dispatch.Futures //#imports-test-probe @@ -119,6 +120,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { import akka.testkit.TestActorRef import akka.util.duration._ import akka.dispatch.Await + import akka.pattern.ask val actorRef = TestActorRef(new MyActor) // hypothetical message stimulating a '42' answer @@ -202,6 +204,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { "demonstrate probe reply" in { import akka.testkit.TestProbe import akka.util.duration._ + import akka.pattern.ask //#test-probe-reply val probe = TestProbe() val future = probe.ref ? "hello" diff --git a/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala b/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala index c45b252ffd..fa76f54744 100644 --- a/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala @@ -141,6 +141,7 @@ class TransactorDocSpec extends AkkaSpec { import akka.dispatch.Await import akka.util.duration._ import akka.util.Timeout + import akka.pattern.ask val system = ActorSystem("app") diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 57742c746a..2ff25df939 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -22,6 +22,7 @@ import scala.annotation.tailrec import com.google.protobuf.ByteString import java.util.concurrent.TimeoutException import akka.dispatch.Await +import akka.pattern.ask /** * Interface for node membership change listener. @@ -251,7 +252,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { try { val t = remoteSettings.RemoteSystemDaemonAckTimeout - Await.result(connection ? (newGossip, t), t) match { + Await.result(connection.?(newGossip)(t), t) match { case Success(receiver) ⇒ log.debug("Gossip sent to [{}] was successfully received", receiver) case Failure(cause) ⇒ log.error(cause, cause.toString) } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index afaa14ec13..923ccdc85d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -118,7 +118,7 @@ case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMs * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. */ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) - extends VirtualPathContainer(_path, _parent, _log) { + extends VirtualPathContainer(system.provider, _path, _parent, _log) { /** * Find the longest matching path which we know about and return that ref diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 12aba95b85..378d82096b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -35,6 +35,11 @@ class RemoteActorRefProvider( def terminationFuture = local.terminationFuture def dispatcher = local.dispatcher + def registerTempActor(actorRef: InternalActorRef, path: ActorPath) = local.registerTempActor(actorRef, path) + def unregisterTempActor(path: ActorPath) = local.unregisterTempActor(path) + def tempPath() = local.tempPath() + def tempContainer = local.tempContainer + val deployer = new RemoteDeployer(settings) val remote = new Remote(settings, remoteSettings) @@ -136,8 +141,6 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def ask(within: Timeout): Option[AskActorRef] = local.ask(within) - /** * Using (checking out) actor on a specific node. */ @@ -158,7 +161,7 @@ trait RemoteRef extends ActorRefScope { * This reference is network-aware (remembers its origin) and immutable. */ private[akka] class RemoteActorRef private[akka] ( - provider: RemoteActorRefProvider, + val provider: RemoteActorRefProvider, remote: RemoteSupport[ParsedTransportAddress], val path: ActorPath, val getParent: InternalActorRef, @@ -183,17 +186,6 @@ private[akka] class RemoteActorRef private[akka] ( override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader) - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - provider.ask(timeout) match { - case Some(a) ⇒ - this.!(message)(a) - a.result - case None ⇒ - this.!(message)(null) - Promise[Any]()(provider.dispatcher) - } - } - def suspend(): Unit = sendSystemMessage(Suspend()) def resume(): Unit = sendSystemMessage(Resume()) diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala index 97b0e9c7e0..bbf653a808 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -5,6 +5,7 @@ import akka.routing._ import akka.actor.{ Actor, Props } import akka.testkit._ import akka.dispatch.Await +import akka.pattern.ask object DirectRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { override def NrOfNodes = 2 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala index e6ed14419f..985e8bf835 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala @@ -6,6 +6,7 @@ import akka.routing._ import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await +import akka.pattern.ask object NewRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { override def NrOfNodes = 2 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala index c2cc058f8d..2c2f0c154f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -5,6 +5,7 @@ import akka.remote._ import akka.routing._ import akka.testkit.DefaultTimeout import akka.dispatch.Await +import akka.pattern.ask object RandomRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { override def NrOfNodes = 4 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 38e3182957..f8ae8f635c 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -5,6 +5,7 @@ import akka.remote._ import akka.routing._ import akka.testkit.DefaultTimeout import akka.dispatch.Await +import akka.pattern.ask object RoundRobinRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { override def NrOfNodes = 4 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala index 96bccb13fa..156c955566 100755 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe + * Copyright (C) 2011-2012 Typesafe */ package akka.remote diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index f65b4e5ed8..797099092b 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -7,6 +7,7 @@ import akka.testkit._ import akka.actor._ import com.typesafe.config._ import akka.dispatch.Await +import akka.pattern.ask object RemoteCommunicationSpec { class Echo extends Actor { @@ -81,8 +82,8 @@ akka { "support ask" in { Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: AskActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") + case ("pong", s: akka.pattern.AskSupport.PromiseActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") } } diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index ad6d2f13ea..5031723f44 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2011 Typesafe + * Copyright (C) 2011-2012 Typesafe */ package akka.sbt diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 1c1e6d03de..7b3ee7ebce 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong import akka.event.EventStream import scala.collection.immutable.Stack import akka.dispatch._ +import akka.pattern.ask /** * This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it @@ -73,7 +74,7 @@ class TestActorRef[T <: Actor]( underlying.actor.asInstanceOf[T] match { case null ⇒ val t = underlying.system.settings.ActorTimeout - Await.result(?(InternalGetActor)(t), t.duration).asInstanceOf[T] + Await.result(this.?(InternalGetActor)(t), t.duration).asInstanceOf[T] case ref ⇒ ref } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 971c794174..c8db05b171 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -17,6 +17,7 @@ import akka.actor.DeadLetter import java.util.concurrent.TimeoutException import akka.dispatch.{ Await, MessageDispatcher } import akka.dispatch.Dispatchers +import akka.pattern.ask object TimingTest extends Tag("timing") diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 97df807119..eee9318c02 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -10,6 +10,7 @@ import akka.event.Logging.Warning import akka.dispatch.{ Future, Promise, Await } import akka.util.duration._ import akka.actor.ActorSystem +import akka.pattern.ask import akka.dispatch.Dispatcher /** diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 4723070299..516bba7322 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -6,6 +6,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.util.duration._ import akka.dispatch.{ Await, Future } +import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TestProbeSpec extends AkkaSpec with DefaultTimeout { diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java index 33c1e7653a..66d666dfd3 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java @@ -18,6 +18,7 @@ import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Await; import akka.dispatch.Future; +import static akka.pattern.Patterns.ask; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; @@ -80,7 +81,7 @@ public class UntypedCoordinatedIncrementTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", timeout); + Future future = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(1, count); } @@ -102,7 +103,7 @@ public class UntypedCoordinatedIncrementTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Futurefuture = counter.ask("GetCount", timeout); + Futurefuture = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(0, count); } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java index d59b1315dc..5820c82573 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java @@ -18,6 +18,7 @@ import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Await; import akka.dispatch.Future; +import static akka.pattern.Patterns.ask; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; @@ -81,7 +82,7 @@ public class UntypedTransactorTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", timeout); + Future future = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(1, count); } @@ -103,7 +104,7 @@ public class UntypedTransactorTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", timeout); + Future future = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(0, count); } diff --git a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala index dc993a2201..265d4a9eaf 100644 --- a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala @@ -12,6 +12,7 @@ import akka.util.duration._ import akka.util.Timeout import akka.testkit._ import scala.concurrent.stm._ +import akka.pattern.ask object CoordinatedIncrement { diff --git a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala index 9d39e50189..4f7fc89c14 100644 --- a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala @@ -15,6 +15,7 @@ import akka.testkit.TestEvent.Mute import scala.concurrent.stm._ import scala.util.Random.{ nextInt ⇒ random } import java.util.concurrent.CountDownLatch +import akka.pattern.ask object FickleFriends { case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) diff --git a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala index 13b04b6bab..1954c9a13b 100644 --- a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala @@ -10,6 +10,7 @@ import akka.util.duration._ import akka.util.Timeout import akka.testkit._ import scala.concurrent.stm._ +import akka.pattern.ask object TransactorIncrement { case class Increment(friends: Seq[ActorRef], latch: TestLatch) diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 68adc998d5..0a7b53bc8d 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -7,6 +7,7 @@ import akka.util.duration._ import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor._ import akka.dispatch.{ Await } +import akka.pattern.ask /** * A Model to represent a version of the zeromq library