fix ActorSelection hashCode/equals and ask, see #3287 #3261

This commit is contained in:
Roland 2013-04-29 22:01:14 +02:00
parent 4d12b8a065
commit 7ea3044af8
5 changed files with 141 additions and 30 deletions

View file

@ -55,9 +55,13 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim
def identify(selection: ActorSelection): Option[ActorRef] = { def identify(selection: ActorSelection): Option[ActorRef] = {
selection.tell(Identify(selection), idProbe.ref) selection.tell(Identify(selection), idProbe.ref)
idProbe.expectMsgPF() { val result = idProbe.expectMsgPF() {
case ActorIdentity(`selection`, ref) ref case ActorIdentity(`selection`, ref) ref
} }
val asked = Await.result((selection ? Identify(selection)).mapTo[ActorIdentity], timeout.duration)
asked.ref must be(result)
asked.correlationId must be(selection)
result
} }
def identify(path: String): Option[ActorRef] = identify(system.actorSelection(path)) def identify(path: String): Option[ActorRef] = identify(system.actorSelection(path))
@ -287,6 +291,19 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim
expectNoMsg(1 second) expectNoMsg(1 second)
} }
"compare equally" in {
ActorSelection(c21, "../*/hello") must be === ActorSelection(c21, "../*/hello")
ActorSelection(c21, "../*/hello").## must be === ActorSelection(c21, "../*/hello").##
ActorSelection(c2, "../*/hello") must not be ActorSelection(c21, "../*/hello")
ActorSelection(c2, "../*/hello").## must not be ActorSelection(c21, "../*/hello").##
ActorSelection(c21, "../*/hell") must not be ActorSelection(c21, "../*/hello")
ActorSelection(c21, "../*/hell").## must not be ActorSelection(c21, "../*/hello").##
}
"print nicely" in {
ActorSelection(c21, "../*/hello").toString must be(s"ActorSelection[Actor[akka://ActorSelectionSpec/user/c2/c21#${c21.path.uid}]/../*/hello]")
}
} }
} }

View file

@ -45,14 +45,14 @@ class AskSpec extends AkkaSpec {
f.isCompleted must be(true) f.isCompleted must be(true)
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
Await.result(f, remaining) Await.result(f, remaining)
}.getMessage must be === "Unsupported type of ActorRef for the recipient. Question not sent to [null]" }.getMessage must be === "Unsupported recipient ActorRef type, question not sent to [null]"
} }
"return broken promises on 0 timeout" in { "return broken promises on 0 timeout" in {
implicit val timeout = Timeout(0 seconds) implicit val timeout = Timeout(0 seconds)
val echo = system.actorOf(Props(new Actor { def receive = { case x sender ! x } })) val echo = system.actorOf(Props(new Actor { def receive = { case x sender ! x } }))
val f = echo ? "foo" val f = echo ? "foo"
val expectedMsg = "Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format echo val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
Await.result(f, remaining) Await.result(f, remaining)
}.getMessage must be === expectedMsg }.getMessage must be === expectedMsg
@ -62,7 +62,7 @@ class AskSpec extends AkkaSpec {
implicit val timeout = Timeout(-1000 seconds) implicit val timeout = Timeout(-1000 seconds)
val echo = system.actorOf(Props(new Actor { def receive = { case x sender ! x } })) val echo = system.actorOf(Props(new Actor { def receive = { case x sender ! x } }))
val f = echo ? "foo" val f = echo ? "foo"
val expectedMsg = "Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format echo val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
Await.result(f, remaining) Await.result(f, remaining)
}.getMessage must be === expectedMsg }.getMessage must be === expectedMsg

View file

@ -7,6 +7,8 @@ import language.implicitConversions
import scala.collection.immutable import scala.collection.immutable
import java.util.regex.Pattern import java.util.regex.Pattern
import akka.util.Helpers import akka.util.Helpers
import akka.routing.MurmurHash
import scala.annotation.tailrec
/** /**
* An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors, * An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors,
@ -16,23 +18,25 @@ import akka.util.Helpers
abstract class ActorSelection extends Serializable { abstract class ActorSelection extends Serializable {
this: ScalaActorSelection this: ScalaActorSelection
protected def target: ActorRef import ActorSelection.PatternHolder
protected def path: Array[AnyRef] protected[akka] val anchor: ActorRef
protected val path: immutable.IndexedSeq[AnyRef]
@deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.2") @deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.2")
def tell(msg: Any): Unit = target ! toMessage(msg, path) def tell(msg: Any): Unit = anchor ! toMessage(msg, path)
def tell(msg: Any, sender: ActorRef): Unit = target.tell(toMessage(msg, path), sender) def tell(msg: Any, sender: ActorRef): Unit = anchor.tell(toMessage(msg, path), sender)
private def toMessage(msg: Any, path: Array[AnyRef]): Any = { private def toMessage(msg: Any, path: immutable.IndexedSeq[AnyRef]): Any = {
var acc = msg var acc = msg
var index = path.length - 1 var index = path.length - 1
while (index >= 0) { while (index >= 0) {
acc = path(index) match { acc = path(index) match {
case ".." SelectParent(acc) case ".." SelectParent(acc)
case s: String SelectChildName(s, acc) case s: String SelectChildName(s, acc)
case p: Pattern SelectChildPattern(p, acc) case p: PatternHolder SelectChildPattern(p.pat, acc)
} }
index -= 1 index -= 1
} }
@ -42,19 +46,23 @@ abstract class ActorSelection extends Serializable {
override def toString: String = { override def toString: String = {
val sb = new java.lang.StringBuilder val sb = new java.lang.StringBuilder
sb.append("ActorSelection["). sb.append("ActorSelection[").
append(target.toString). append(anchor.toString).
append(path.mkString("/", "/", "")). append(path.mkString("/", "/", "")).
append("]") append("]")
sb.toString sb.toString
} }
override def equals(obj: Any): Boolean = obj match { override def equals(obj: Any): Boolean = obj match {
case s: ActorSelection this.target == s.target && this.path == s.path case s: ActorSelection this.anchor == s.anchor && this.path == s.path
case _ false case _ false
} }
override def hashCode: Int = override lazy val hashCode: Int = {
37 * (37 * 17 + target.hashCode) + path.hashCode import MurmurHash._
var h = startHash(anchor.##)
h = extendHash(h, path.##, startMagicA, startMagicB)
finalizeHash(h)
}
} }
/** /**
@ -65,18 +73,22 @@ object ActorSelection {
//This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection //This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection
implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection] implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection]
private case class PatternHolder(str: String) {
val pat = Helpers.makePattern(str)
override def toString = str
}
/** /**
* Construct an ActorSelection from the given string representing a path * Construct an ActorSelection from the given string representing a path
* relative to the given target. This operation has to create all the * relative to the given target. This operation has to create all the
* matching magic, so it is preferable to cache its result if the * matching magic, so it is preferable to cache its result if the
* intention is to send messages frequently. * intention is to send messages frequently.
*/ */
def apply(anchor: ActorRef, path: String): ActorSelection = { def apply(anchorRef: ActorRef, path: String): ActorSelection = {
val elems = path.split("/+").dropWhile(_.isEmpty) val compiled = compile(path.split("/+"))
val compiled: Array[AnyRef] = elems map (x if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) Helpers.makePattern(x) else x)
new ActorSelection with ScalaActorSelection { new ActorSelection with ScalaActorSelection {
def target = anchor override val anchor = anchorRef
def path = compiled override val path = compiled
} }
} }
@ -86,16 +98,22 @@ object ActorSelection {
* matching magic, so it is preferable to cache its result if the * matching magic, so it is preferable to cache its result if the
* intention is to send messages frequently. * intention is to send messages frequently.
*/ */
def apply(anchor: ActorRef, elements: immutable.Iterable[String]): ActorSelection = { def apply(anchorRef: ActorRef, elements: immutable.Iterable[String]): ActorSelection = {
// TODO #3073 optimize/align compiled Array val compiled = compile(elements)
val elems: Array[String] = elements.collect { case x if x.nonEmpty x }(collection.breakOut)
val compiled: Array[AnyRef] = elems map (x if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) Helpers.makePattern(x) else x)
new ActorSelection with ScalaActorSelection { new ActorSelection with ScalaActorSelection {
override def target = anchor override val anchor = anchorRef
override def path = compiled override val path = compiled
} }
} }
private def compile(in: Iterable[String]): immutable.IndexedSeq[AnyRef] = {
in.iterator.filterNot(_.isEmpty).map { x
if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1))
PatternHolder(x)
else x
}.toVector
}
} }
/** /**

View file

@ -90,6 +90,13 @@ private[akka] trait PathUtils {
} }
} }
/**
* Extractor for so-called relative actor paths as in relative URI, not in
* relative to some actor. Examples:
*
* * "grand/child"
* * "/user/hello/world"
*/
object RelativeActorPath extends PathUtils { object RelativeActorPath extends PathUtils {
def unapply(addr: String): Option[immutable.Seq[String]] = { def unapply(addr: String): Option[immutable.Seq[String]] = {
try { try {

View file

@ -74,6 +74,53 @@ trait AskSupport {
* See [[scala.concurrent.Future]] for a description of `flow` * See [[scala.concurrent.Future]] for a description of `flow`
*/ */
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message
/**
* Import this implicit conversion to gain `?` and `ask` methods on
* [[akka.actor.ActorSelection]], which will defer to the
* `ask(actorSelection, message)(timeout)` method defined here.
*
* {{{
* import akka.pattern.ask
*
* val future = selection ? message // => ask(selection, message)
* val future = selection ask message // => ask(selection, message)
* val future = selection.ask(message)(timeout) // => ask(selection, message)(timeout)
* }}}
*
* All of the above use an implicit [[akka.actor.Timeout]].
*/
implicit def ask(actorSelection: ActorSelection): AskableActorSelection = new AskableActorSelection(actorSelection)
/**
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = ask(selection, request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* See [[scala.concurrent.Future]] for a description of `flow`
*/
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message
} }
/* /*
@ -84,15 +131,37 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated case ref: InternalActorRef if ref.isTerminated
actorRef ! message actorRef ! message
Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef)) Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
case ref: InternalActorRef case ref: InternalActorRef
if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef)) if (timeout.duration.length <= 0)
Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
else { else {
val a = PromiseActorRef(ref.provider, timeout) val a = PromiseActorRef(ref.provider, timeout)
actorRef.tell(message, a) actorRef.tell(message, a)
a.result.future a.result.future
} }
case _ Future.failed[Any](new IllegalArgumentException("Unsupported type of ActorRef for the recipient. Question not sent to [%s]" format actorRef)) case _ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
}
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)
}
/*
* Implementation class of the ask pattern enrichment of ActorSelection
*/
final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorSel.anchor match {
case ref: InternalActorRef
if (timeout.duration.length <= 0)
Future.failed[Any](
new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorSel]"))
else {
val a = PromiseActorRef(ref.provider, timeout)
actorSel.tell(message, a)
a.result.future
}
case _ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
} }
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout) def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)