diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala index a3a09b9575..df1c3471ff 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala @@ -55,9 +55,13 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim def identify(selection: ActorSelection): Option[ActorRef] = { selection.tell(Identify(selection), idProbe.ref) - idProbe.expectMsgPF() { + val result = idProbe.expectMsgPF() { case ActorIdentity(`selection`, ref) ⇒ ref } + val asked = Await.result((selection ? Identify(selection)).mapTo[ActorIdentity], timeout.duration) + asked.ref must be(result) + asked.correlationId must be(selection) + result } def identify(path: String): Option[ActorRef] = identify(system.actorSelection(path)) @@ -287,6 +291,19 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim expectNoMsg(1 second) } + "compare equally" in { + ActorSelection(c21, "../*/hello") must be === ActorSelection(c21, "../*/hello") + ActorSelection(c21, "../*/hello").## must be === ActorSelection(c21, "../*/hello").## + ActorSelection(c2, "../*/hello") must not be ActorSelection(c21, "../*/hello") + ActorSelection(c2, "../*/hello").## must not be ActorSelection(c21, "../*/hello").## + ActorSelection(c21, "../*/hell") must not be ActorSelection(c21, "../*/hello") + ActorSelection(c21, "../*/hell").## must not be ActorSelection(c21, "../*/hello").## + } + + "print nicely" in { + ActorSelection(c21, "../*/hello").toString must be(s"ActorSelection[Actor[akka://ActorSelectionSpec/user/c2/c21#${c21.path.uid}]/../*/hello]") + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index 73e707e3ac..57431242ed 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -45,14 +45,14 @@ class AskSpec extends AkkaSpec { f.isCompleted must be(true) intercept[IllegalArgumentException] { Await.result(f, remaining) - }.getMessage must be === "Unsupported type of ActorRef for the recipient. Question not sent to [null]" + }.getMessage must be === "Unsupported recipient ActorRef type, question not sent to [null]" } "return broken promises on 0 timeout" in { implicit val timeout = Timeout(0 seconds) val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) val f = echo ? "foo" - val expectedMsg = "Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format echo + val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo intercept[IllegalArgumentException] { Await.result(f, remaining) }.getMessage must be === expectedMsg @@ -62,7 +62,7 @@ class AskSpec extends AkkaSpec { implicit val timeout = Timeout(-1000 seconds) val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) val f = echo ? "foo" - val expectedMsg = "Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format echo + val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo intercept[IllegalArgumentException] { Await.result(f, remaining) }.getMessage must be === expectedMsg diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 64f47a682c..3bdabd0e2c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -7,6 +7,8 @@ import language.implicitConversions import scala.collection.immutable import java.util.regex.Pattern import akka.util.Helpers +import akka.routing.MurmurHash +import scala.annotation.tailrec /** * An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors, @@ -16,23 +18,25 @@ import akka.util.Helpers abstract class ActorSelection extends Serializable { this: ScalaActorSelection ⇒ - protected def target: ActorRef + import ActorSelection.PatternHolder - protected def path: Array[AnyRef] + protected[akka] val anchor: ActorRef + + protected val path: immutable.IndexedSeq[AnyRef] @deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.2") - def tell(msg: Any): Unit = target ! toMessage(msg, path) + def tell(msg: Any): Unit = anchor ! toMessage(msg, path) - def tell(msg: Any, sender: ActorRef): Unit = target.tell(toMessage(msg, path), sender) + def tell(msg: Any, sender: ActorRef): Unit = anchor.tell(toMessage(msg, path), sender) - private def toMessage(msg: Any, path: Array[AnyRef]): Any = { + private def toMessage(msg: Any, path: immutable.IndexedSeq[AnyRef]): Any = { var acc = msg var index = path.length - 1 while (index >= 0) { acc = path(index) match { - case ".." ⇒ SelectParent(acc) - case s: String ⇒ SelectChildName(s, acc) - case p: Pattern ⇒ SelectChildPattern(p, acc) + case ".." ⇒ SelectParent(acc) + case s: String ⇒ SelectChildName(s, acc) + case p: PatternHolder ⇒ SelectChildPattern(p.pat, acc) } index -= 1 } @@ -42,19 +46,23 @@ abstract class ActorSelection extends Serializable { override def toString: String = { val sb = new java.lang.StringBuilder sb.append("ActorSelection["). - append(target.toString). + append(anchor.toString). append(path.mkString("/", "/", "")). append("]") sb.toString } override def equals(obj: Any): Boolean = obj match { - case s: ActorSelection ⇒ this.target == s.target && this.path == s.path + case s: ActorSelection ⇒ this.anchor == s.anchor && this.path == s.path case _ ⇒ false } - override def hashCode: Int = - 37 * (37 * 17 + target.hashCode) + path.hashCode + override lazy val hashCode: Int = { + import MurmurHash._ + var h = startHash(anchor.##) + h = extendHash(h, path.##, startMagicA, startMagicB) + finalizeHash(h) + } } /** @@ -65,18 +73,22 @@ object ActorSelection { //This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection] + private case class PatternHolder(str: String) { + val pat = Helpers.makePattern(str) + override def toString = str + } + /** * Construct an ActorSelection from the given string representing a path * relative to the given target. This operation has to create all the * matching magic, so it is preferable to cache its result if the * intention is to send messages frequently. */ - def apply(anchor: ActorRef, path: String): ActorSelection = { - val elems = path.split("/+").dropWhile(_.isEmpty) - val compiled: Array[AnyRef] = elems map (x ⇒ if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) Helpers.makePattern(x) else x) + def apply(anchorRef: ActorRef, path: String): ActorSelection = { + val compiled = compile(path.split("/+")) new ActorSelection with ScalaActorSelection { - def target = anchor - def path = compiled + override val anchor = anchorRef + override val path = compiled } } @@ -86,16 +98,22 @@ object ActorSelection { * matching magic, so it is preferable to cache its result if the * intention is to send messages frequently. */ - def apply(anchor: ActorRef, elements: immutable.Iterable[String]): ActorSelection = { - // TODO #3073 optimize/align compiled Array - val elems: Array[String] = elements.collect { case x if x.nonEmpty ⇒ x }(collection.breakOut) - val compiled: Array[AnyRef] = elems map (x ⇒ if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) Helpers.makePattern(x) else x) + def apply(anchorRef: ActorRef, elements: immutable.Iterable[String]): ActorSelection = { + val compiled = compile(elements) new ActorSelection with ScalaActorSelection { - override def target = anchor - override def path = compiled + override val anchor = anchorRef + override val path = compiled } } + private def compile(in: Iterable[String]): immutable.IndexedSeq[AnyRef] = { + in.iterator.filterNot(_.isEmpty).map { x ⇒ + if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) + PatternHolder(x) + else x + }.toVector + } + } /** diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index 8bde7ef8cc..16db6c2c8d 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -90,6 +90,13 @@ private[akka] trait PathUtils { } } +/** + * Extractor for so-called “relative actor paths” as in “relative URI”, not in + * “relative to some actor”. Examples: + * + * * "grand/child" + * * "/user/hello/world" + */ object RelativeActorPath extends PathUtils { def unapply(addr: String): Option[immutable.Seq[String]] = { try { diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 0a26eca0ca..1905d73533 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -74,6 +74,53 @@ trait AskSupport { * See [[scala.concurrent.Future]] for a description of `flow` */ def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message + + /** + * Import this implicit conversion to gain `?` and `ask` methods on + * [[akka.actor.ActorSelection]], which will defer to the + * `ask(actorSelection, message)(timeout)` method defined here. + * + * {{{ + * import akka.pattern.ask + * + * val future = selection ? message // => ask(selection, message) + * val future = selection ask message // => ask(selection, message) + * val future = selection.ask(message)(timeout) // => ask(selection, message)(timeout) + * }}} + * + * All of the above use an implicit [[akka.actor.Timeout]]. + */ + implicit def ask(actorSelection: ActorSelection): AskableActorSelection = new AskableActorSelection(actorSelection) + + /** + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = ask(selection, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * See [[scala.concurrent.Future]] for a description of `flow` + */ + def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message } /* @@ -84,15 +131,37 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { case ref: InternalActorRef if ref.isTerminated ⇒ actorRef ! message - Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef)) + Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated.")) case ref: InternalActorRef ⇒ - if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef)) + if (timeout.duration.length <= 0) + Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]")) else { val a = PromiseActorRef(ref.provider, timeout) actorRef.tell(message, a) a.result.future } - case _ ⇒ Future.failed[Any](new IllegalArgumentException("Unsupported type of ActorRef for the recipient. Question not sent to [%s]" format actorRef)) + case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]")) + } + + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout) +} + +/* + * Implementation class of the “ask” pattern enrichment of ActorSelection + */ +final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal { + + def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorSel.anchor match { + case ref: InternalActorRef ⇒ + if (timeout.duration.length <= 0) + Future.failed[Any]( + new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorSel]")) + else { + val a = PromiseActorRef(ref.provider, timeout) + actorSel.tell(message, a) + a.result.future + } + case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]")) } def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)