diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 3e9a0a91f4..e375f29e4b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -287,9 +287,13 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor protected def systemImpl = this + @inline private def askAndAwait(actorRef: ActorRef, message: Any)(implicit timeout: akka.util.Timeout): Any = { + Await.result(Futures.ask(actorRef, message), timeout.duration) + } + private[akka] def systemActorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - Await.result(Futures.ask(systemGuardian, CreateChild(props, name)), timeout.duration) match { + askAndAwait(systemGuardian, CreateChild(props, name)) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -297,7 +301,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - Await.result(Futures.ask(guardian, CreateChild(props, name)), timeout.duration) match { + askAndAwait(guardian, CreateChild(props, name)) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -305,7 +309,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props): ActorRef = { implicit val timeout = settings.CreationTimeout - Await.result(Futures.ask(guardian, CreateRandomNameChild(props)), timeout.duration) match { + askAndAwait(guardian, CreateRandomNameChild(props)) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -317,8 +321,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor val guard = guardian.path val sys = systemGuardian.path path.parent match { - case `guard` ⇒ Await.result(Futures.ask(guardian, StopChild(actor)), timeout.duration) - case `sys` ⇒ Await.result(Futures.ask(systemGuardian, StopChild(actor)), timeout.duration) + case `guard` ⇒ askAndAwait(guardian, StopChild(actor)) + case `sys` ⇒ askAndAwait(systemGuardian, StopChild(actor)) case _ ⇒ actor.asInstanceOf[InternalActorRef].stop() } } diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index 0b06470c79..cedc7befd7 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -41,4 +41,26 @@ package object actor { } } + // Implicit for converting a Promise to an actor. + // Symmetric to the future2actor conversion, which allows + // piping a Future result (read side) to an Actor's mailbox, this + // conversion allows using an Actor to complete a Promise (write side) + // + // Future.ask / actor ? message is now a trivial implementation that can + // also be done in user code (assuming actorRef, timeout and dispatcher implicits): + // + // Future.ask(actor, message) = { + // val promise = Promise[Any]() + // actor ! (message, promise) + // promise + // } + + @inline implicit def promise2actor(promise: akka.dispatch.Promise[Any])(implicit actorRef: ActorRef, timeout: akka.util.Timeout) = { + val provider = actorRef.asInstanceOf[InternalActorRef].provider + provider.ask(promise, timeout) match { + case Some(ref) ⇒ ref + case None ⇒ null + } + } + } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index fc4225600d..6eb408fd7a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -55,14 +55,10 @@ object Await { object Futures { def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = { - val provider = actor.asInstanceOf[InternalActorRef].provider - val promise = Promise[Any]()(provider.dispatcher) - provider.ask(promise, timeout) match { - case Some(a) ⇒ - actor.!(message)(a) - case None ⇒ - actor.!(message)(null) - } + implicit val dispatcher = actor.asInstanceOf[InternalActorRef].provider.dispatcher + implicit val actorRefContext = actor // for promise2actor implicit conversion + val promise = Promise[Any]() + actor.!(message)(promise) promise } diff --git a/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java b/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java index e6b45f675c..75fa92cd8f 100644 --- a/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java +++ b/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java @@ -10,6 +10,7 @@ import org.junit.Test; //#imports import akka.actor.*; import akka.dispatch.Await; +import static akka.dispatch.Futures.ask; import akka.transactor.Coordinated; import akka.util.Duration; import akka.util.Timeout; @@ -30,7 +31,7 @@ public class TransactorDocTest { counter1.tell(new Coordinated(new Increment(counter2), timeout)); - Integer count = (Integer) Await.result(counter1.ask("GetCount", timeout), timeout.duration()); + Integer count = (Integer) Await.result(ask(counter1, "GetCount", timeout), timeout.duration()); //#coordinated-example assertEquals(count, new Integer(1)); @@ -71,7 +72,7 @@ public class TransactorDocTest { counter.tell(coordinated.coordinate(new Increment())); coordinated.await(); - Integer count = (Integer) Await.result(counter.ask("GetCount", timeout), timeout.duration()); + Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration()); assertEquals(count, new Integer(1)); system.shutdown(); @@ -88,10 +89,10 @@ public class TransactorDocTest { friendlyCounter.tell(coordinated.coordinate(new Increment(friend))); coordinated.await(); - Integer count1 = (Integer) Await.result(friendlyCounter.ask("GetCount", timeout), timeout.duration()); + Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration()); assertEquals(count1, new Integer(1)); - Integer count2 = (Integer) Await.result(friend.ask("GetCount", timeout), timeout.duration()); + Integer count2 = (Integer) Await.result(ask(friend, "GetCount", timeout), timeout.duration()); assertEquals(count2, new Integer(1)); system.shutdown(); diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java index 7fce881b2c..267cf261b7 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java @@ -18,6 +18,7 @@ import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Await; import akka.dispatch.Future; +import static akka.dispatch.Futures.ask; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; @@ -80,7 +81,7 @@ public class UntypedCoordinatedIncrementTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", timeout); + Future future = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(1, count); } @@ -102,7 +103,7 @@ public class UntypedCoordinatedIncrementTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Futurefuture = counter.ask("GetCount", timeout); + Futurefuture = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(0, count); } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java index 9e2cf39f8d..3c80d659cf 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java @@ -18,6 +18,7 @@ import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Await; import akka.dispatch.Future; +import static akka.dispatch.Futures.ask; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; @@ -81,7 +82,7 @@ public class UntypedTransactorTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", timeout); + Future future = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(1, count); } @@ -103,7 +104,7 @@ public class UntypedTransactorTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", timeout); + Future future = ask(counter, "GetCount", timeout); int count = (Integer) Await.result(future, timeout.duration()); assertEquals(0, count); }