diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala old mode 100644 new mode 100755 index 0b3215cd60..6af8c36198 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -520,7 +520,7 @@ class LocalActorRefProvider( // case other ⇒ result.success(other) // } //}), systemGuardian, systemGuardian.path / "promise" / tempName(), false, None) - //val ff = system.scheduler.scheduleOnce(t.duration) { b.stop() } + //val ff = system.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } //result onComplete { _ ⇒ // b.stop() // ff.cancel() diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index e375f29e4b..098099c1d1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -7,6 +7,7 @@ import akka.config.ConfigurationException import akka.actor._ import akka.event._ import akka.dispatch._ +import akka.patterns.ask import akka.util.duration._ import akka.util.Timeout import akka.util.Timeout._ @@ -287,13 +288,9 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor protected def systemImpl = this - @inline private def askAndAwait(actorRef: ActorRef, message: Any)(implicit timeout: akka.util.Timeout): Any = { - Await.result(Futures.ask(actorRef, message), timeout.duration) - } - private[akka] def systemActorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - askAndAwait(systemGuardian, CreateChild(props, name)) match { + Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -301,7 +298,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - askAndAwait(guardian, CreateChild(props, name)) match { + Await.result(guardian ? CreateChild(props, name), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -309,7 +306,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props): ActorRef = { implicit val timeout = settings.CreationTimeout - askAndAwait(guardian, CreateRandomNameChild(props)) match { + Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -321,8 +318,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor val guard = guardian.path val sys = systemGuardian.path path.parent match { - case `guard` ⇒ askAndAwait(guardian, StopChild(actor)) - case `sys` ⇒ askAndAwait(systemGuardian, StopChild(actor)) + case `guard` ⇒ Await.result(guardian ? StopChild(actor), timeout.duration) + case `sys` ⇒ Await.result(systemGuardian ? StopChild(actor), timeout.duration) case _ ⇒ actor.asInstanceOf[InternalActorRef].stop() } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 24ccda0f8d..87fbd98f47 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -396,17 +396,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ + import akka.patterns.ask MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m; null //Null return value - case m if m.returnsFuture_? ⇒ Futures.ask(actor, m)(timeout) + case m if m.returnsFuture_? ⇒ actor.?(m, timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ - val f = Futures.ask(actor, m)(timeout) + val f = actor.?(m, timeout) (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None case Some(Right(joption: AnyRef)) ⇒ joption case Some(Left(ex)) ⇒ throw ex } - case m ⇒ Await.result(Futures.ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef] + case m ⇒ Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] } } } diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index cedc7befd7..f67a3bb1fa 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -8,9 +8,6 @@ package object actor { implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = ref.asInstanceOf[ScalaActorRef] implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = ref.asInstanceOf[ActorRef] - implicit def actorRef2Askable(actorRef: ActorRef) = new dispatch.AskableActorRef(actorRef) - implicit def askable2ActorRef(askable: dispatch.AskableActorRef) = askable.actorRef - type Uuid = com.eaio.uuid.UUID def newUuid(): Uuid = new Uuid() @@ -41,7 +38,17 @@ package object actor { } } - // Implicit for converting a Promise to an actor. +} + +package object patterns { + + import akka.actor.{ ActorRef, InternalActorRef } + import akka.dispatch.Promise + import akka.util.Timeout + + implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef()(actorRef) + + // Implicit for converting a Promise to an ActorRef. // Symmetric to the future2actor conversion, which allows // piping a Future result (read side) to an Actor's mailbox, this // conversion allows using an Actor to complete a Promise (write side) @@ -49,13 +56,13 @@ package object actor { // Future.ask / actor ? message is now a trivial implementation that can // also be done in user code (assuming actorRef, timeout and dispatcher implicits): // - // Future.ask(actor, message) = { + // Patterns.ask(actor, message) = { // val promise = Promise[Any]() // actor ! (message, promise) // promise // } - @inline implicit def promise2actor(promise: akka.dispatch.Promise[Any])(implicit actorRef: ActorRef, timeout: akka.util.Timeout) = { + @inline implicit def promise2actorRef(promise: Promise[Any])(implicit actorRef: ActorRef, timeout: Timeout): ActorRef = { val provider = actorRef.asInstanceOf[InternalActorRef].provider provider.ask(promise, timeout) match { case Some(ref) ⇒ ref @@ -64,3 +71,91 @@ package object actor { } } + +package patterns { + + import akka.actor.{ ActorRef, InternalActorRef } + import akka.dispatch.{ Future, Promise } + import akka.util.Timeout + + final class AskableActorRef(implicit val actorRef: ActorRef) { + + /** + * Akka Java API. + * + * Sends a message asynchronously returns a future holding the eventual reply message. + * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given + * timeout has expired. + * + * NOTE: + * Use this method with care. In most cases it is better to use 'tell' together with the sender + * parameter to implement non-blocking request/response message exchanges. + * + * If you are sending messages using ask and using blocking operations on the Future, such as + * 'get', then you have to use getContext().sender().tell(...) + * in the target actor to send a reply message to the original sender, and thereby completing the Future, + * otherwise the sender will block until the timeout expires. + * + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s reference, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + */ + def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]] + + def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis)) + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given + * timeout has expired. + * + * NOTE: + * Use this method with care. In most cases it is better to use '!' together with implicit or explicit + * sender parameter to implement non-blocking request/response message exchanges. + * + * If you are sending messages using ask and using blocking operations on the Future, such as + * 'get', then you have to use getContext().sender().tell(...) + * in the target actor to send a reply message to the original sender, and thereby completing the Future, + * otherwise the sender will block until the timeout expires. + * + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s reference, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + */ + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + implicit val dispatcher = actorRef.asInstanceOf[InternalActorRef].provider.dispatcher + val promise = Promise[Any]() + actorRef.!(message)(promise) + promise + } + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + * The implicit parameter with the default value is just there to disambiguate it from the version that takes the + * implicit timeout + */ + def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) + } + +} + +object Patterns { + + import akka.actor.ActorRef + import akka.dispatch.Future + import akka.patterns.{ ask => actorRef2Askable } + import akka.util.Timeout + + def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] = + actorRef2Askable(actor).?(message)(timeout) + + def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] = + actorRef2Askable(actor).?(message)(new Timeout(timeoutMillis)) + +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 6eb408fd7a..fea97fbaf3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, import akka.dispatch.Await.CanAwait import java.util.concurrent._ import akka.actor.ActorSystem -import akka.actor.{ ActorRef, InternalActorRef } object Await { sealed trait CanAwait @@ -54,20 +53,6 @@ object Await { */ object Futures { - def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = { - implicit val dispatcher = actor.asInstanceOf[InternalActorRef].provider.dispatcher - implicit val actorRefContext = actor // for promise2actor implicit conversion - val promise = Promise[Any]() - actor.!(message)(promise) - promise - } - - def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = - ask(actor, message)(timeout) - - def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] = - ask(actor, message)(new Timeout(timeoutMillis)) - /** * Java API, equivalent to Future.apply */ @@ -149,66 +134,6 @@ object Futures { } } -final class AskableActorRef(val actorRef: ActorRef) { - - /** - * Akka Java API. - * - * Sends a message asynchronously returns a future holding the eventual reply message. - * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given - * timeout has expired. - * - * NOTE: - * Use this method with care. In most cases it is better to use 'tell' together with the sender - * parameter to implement non-blocking request/response message exchanges. - * - * If you are sending messages using ask and using blocking operations on the Future, such as - * 'get', then you have to use getContext().sender().tell(...) - * in the target actor to send a reply message to the original sender, and thereby completing the Future, - * otherwise the sender will block until the timeout expires. - * - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s reference, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - */ - def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]] - - def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis)) - - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given - * timeout has expired. - * - * NOTE: - * Use this method with care. In most cases it is better to use '!' together with implicit or explicit - * sender parameter to implement non-blocking request/response message exchanges. - * - * If you are sending messages using ask and using blocking operations on the Future, such as - * 'get', then you have to use getContext().sender().tell(...) - * in the target actor to send a reply message to the original sender, and thereby completing the Future, - * otherwise the sender will block until the timeout expires. - * - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s reference, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - */ - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = Futures.ask(actorRef, message) - - /** - * Sends a message asynchronously, returning a future which may eventually hold the reply. - * The implicit parameter with the default value is just there to disambiguate it from the version that takes the - * implicit timeout - */ - def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) -} - object Future { /** diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 4099dc06b5..bb0f881c94 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -14,7 +14,7 @@ import akka.util.Timeout import java.util.concurrent.atomic.AtomicInteger import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException -import akka.dispatch.{ Await, Futures } +import akka.dispatch.Await object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream @@ -158,7 +158,8 @@ trait LoggingBus extends ActorEventBus { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) implicit val timeout = Timeout(3 seconds) - val response = try Await.result(Futures.ask(actor, InitializeLogger(this)), timeout.duration) catch { + import akka.patterns.ask + val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) }