diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 7dfb0b2047..76f4b7f729 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -124,59 +124,6 @@ case object ReceiveTimeout extends ReceiveTimeout { def getInstance = this } -/** - * INTERNAL API - * ActorRefFactory.actorSelection returns a special ref which sends these - * nested path descriptions whenever using ! on them, the idea being that the - * message is delivered by active routing of the various actors involved. - */ -private[akka] sealed trait SelectionPath extends AutoReceivedMessage with PossiblyHarmful - -/** - * INTERNAL API - */ -@SerialVersionUID(1L) -private[akka] case class SelectChildName(name: String, next: Any) extends SelectionPath { - - def wrappedMessage: Any = { - @tailrec def rec(nx: Any): Any = nx match { - case SelectChildName(_, n) ⇒ rec(n) - case SelectChildPattern(_, n) ⇒ rec(n) - case SelectParent(n) ⇒ rec(n) - case x ⇒ x - } - rec(next) - } - - def identifyRequest: Option[Identify] = wrappedMessage match { - case x: Identify ⇒ Some(x) - case _ ⇒ None - } - - def allChildNames: immutable.Iterable[String] = { - @tailrec def rec(nx: Any, acc: List[String]): List[String] = nx match { - case SelectChildName(name, n) ⇒ rec(n, name :: acc) - case SelectChildPattern(_, n) ⇒ throw new IllegalArgumentException("SelectChildPattern not allowed") - case SelectParent(n) if acc.isEmpty ⇒ rec(n, acc) - case SelectParent(n) ⇒ rec(n, acc.tail) - case _ ⇒ acc - } - rec(this, Nil).reverse - } -} - -/** - * INTERNAL API - */ -@SerialVersionUID(1L) -private[akka] case class SelectChildPattern(pattern: Pattern, next: Any) extends SelectionPath - -/** - * INTERNAL API - */ -@SerialVersionUID(1L) -private[akka] case class SelectParent(next: Any) extends SelectionPath - /** * IllegalActorStateException is thrown when a core invariant in the Actor implementation has been violated. * For instance, if you try to create an Actor that doesn't extend Actor. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1cec62fbcd..cc4098ead7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -243,6 +243,12 @@ private[akka] trait Cell { */ def getChildByName(name: String): Option[ChildStats] + /** + * Method for looking up a single child beneath this actor. + * It is racy if called from the outside. + */ + def getSingleChild(name: String): InternalActorRef + /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. @@ -471,30 +477,17 @@ private[akka] class ActorCell( case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ - if (self == system.provider.rootGuardian) self.tell(m, msg.sender) - else parent.tell(m, msg.sender) - case s @ SelectChildName(name, m) ⇒ - def selectChild(): Unit = { - getChildByName(name) match { - case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender) - case _ ⇒ - s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) } - } - } - // need this special case because of extraNames handled by rootGuardian - if (self == system.provider.rootGuardian) { - self.asInstanceOf[LocalActorRef].getSingleChild(name) match { - case Nobody ⇒ selectChild() - case child ⇒ child.tell(m, msg.sender) - } - } else - selectChild() - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) - case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self)) + case sel: ActorSelectionMessage ⇒ receiveSelection(sel) + case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self)) } } + private def receiveSelection(sel: ActorSelectionMessage): Unit = + if (sel.elements.isEmpty) + invoke(Envelope(sel.msg, sender, system)) + else + ActorSelection.deliverSelection(self, sender, sel) + final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg) /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c93e5b41eb..2ddbcd3d7f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -4,6 +4,7 @@ package akka.actor +import scala.collection.immutable import akka.dispatch._ import akka.dispatch.sysmsg._ import java.lang.{ UnsupportedOperationException, IllegalStateException } @@ -266,6 +267,8 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe */ private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒ def underlying: Cell + def children: immutable.Iterable[ActorRef] + def getSingleChild(name: String): InternalActorRef } /** @@ -343,19 +346,14 @@ private[akka] class LocalActorRef private[akka] ( override def provider: ActorRefProvider = actorCell.provider + def children: immutable.Iterable[ActorRef] = actorCell.children + /** * Method for looking up a single child beneath this actor. Override in order * to inject “synthetic” actor paths like “/temp”. * It is racy if called from the outside. */ - def getSingleChild(name: String): InternalActorRef = { - val (childName, uid) = ActorCell.splitNameAndUid(name) - actorCell.getChildByName(childName) match { - case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid ⇒ - crs.child.asInstanceOf[InternalActorRef] - case _ ⇒ Nobody - } - } + def getSingleChild(name: String): InternalActorRef = actorCell.getSingleChild(name) override def getChild(names: Iterator[String]): InternalActorRef = { /* @@ -507,11 +505,11 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, None) true - case s: SelectChildName ⇒ - s.identifyRequest match { + case sel: ActorSelectionMessage ⇒ + sel.identifyRequest match { case Some(identify) ⇒ sender ! ActorIdentity(identify.messageId, None) case None ⇒ - eventStream.publish(DeadLetter(s.wrappedMessage, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) + eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) } true case _ ⇒ false diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 7312043480..7b2b542dd8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -26,27 +26,16 @@ import akka.dispatch.ExecutionContexts abstract class ActorSelection extends Serializable { this: ScalaActorSelection ⇒ - import ActorSelection.PatternHolder - protected[akka] val anchor: ActorRef - protected val path: immutable.IndexedSeq[AnyRef] + protected val path: immutable.IndexedSeq[SelectionPathElement] @deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.2") def tell(msg: Any): Unit = tell(msg, Actor.noSender) - def tell(msg: Any, sender: ActorRef): Unit = { - @tailrec def toMessage(msg: Any, path: immutable.IndexedSeq[AnyRef], index: Int): Any = - if (index < 0) msg - else toMessage( - path(index) match { - case ".." ⇒ SelectParent(msg) - case s: String ⇒ SelectChildName(s, msg) - case p: PatternHolder ⇒ SelectChildPattern(p.pat, msg) - }, path, index - 1) - - anchor.tell(toMessage(msg, path, path.length - 1), sender) - } + def tell(msg: Any, sender: ActorRef): Unit = + ActorSelection.deliverSelection(anchor.asInstanceOf[InternalActorRef], sender, + ActorSelectionMessage(msg, path)) /** * Resolve the [[ActorRef]] matching this selection. @@ -108,11 +97,6 @@ object ActorSelection { //This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection] - @SerialVersionUID(1L) private case class PatternHolder(str: String) { - val pat = Helpers.makePattern(str) - override def toString = str - } - /** * Construct an ActorSelection from the given string representing a path * relative to the given target. This operation has to create all the @@ -128,14 +112,68 @@ object ActorSelection { * intention is to send messages frequently. */ def apply(anchorRef: ActorRef, elements: Iterable[String]): ActorSelection = { - val compiled: immutable.IndexedSeq[AnyRef] = elements.collect({ - case x if !x.isEmpty ⇒ if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) PatternHolder(x) else x + val compiled: immutable.IndexedSeq[SelectionPathElement] = elements.collect({ + case x if !x.isEmpty ⇒ + if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) SelectChildPattern(x) + else if (x == "..") SelectParent + else SelectChildName(x) })(scala.collection.breakOut) new ActorSelection with ScalaActorSelection { override val anchor = anchorRef override val path = compiled } } + + /** + * INTERNAL API + * The receive logic for ActorSelectionMessage. The idea is to recursively descend as far as possible + * with local refs and hand over to that “foreign” child when we encounter it. + */ + private[akka] def deliverSelection(anchor: InternalActorRef, sender: ActorRef, sel: ActorSelectionMessage): Unit = + if (sel.elements.isEmpty) + anchor.tell(sel.msg, sender) + else { + val iter = sel.elements.iterator + + @tailrec def rec(ref: InternalActorRef): Unit = { + ref match { + case refWithCell: ActorRefWithCell ⇒ + iter.next() match { + case SelectParent ⇒ + val parent = ref.getParent + if (iter.isEmpty) + parent.tell(sel.msg, sender) + else + rec(parent) + case SelectChildName(name) ⇒ + val child = refWithCell.getSingleChild(name) + if (child == Nobody) + sel.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) } + else if (iter.isEmpty) + child.tell(sel.msg, sender) + else + rec(child) + case p: SelectChildPattern ⇒ + // fan-out when there is a wildcard + val chldr = refWithCell.children + if (iter.isEmpty) + for (c ← chldr if p.pattern.matcher(c.path.name).matches) + c.tell(sel.msg, sender) + else { + val m = sel.copy(elements = iter.toVector) + for (c ← chldr if p.pattern.matcher(c.path.name).matches) + deliverSelection(c.asInstanceOf[InternalActorRef], sender, m) + } + } + + case _ ⇒ + // foreign ref, continue by sending ActorSelectionMessage to it with remaining elements + ref.tell(sel.copy(elements = iter.toVector), sender) + } + } + + rec(anchor) + } } /** @@ -148,6 +186,53 @@ trait ScalaActorSelection { def !(msg: Any)(implicit sender: ActorRef = Actor.noSender) = tell(msg, sender) } +/** + * INTERNAL API + * ActorRefFactory.actorSelection returns a ActorSelection which sends these + * nested path descriptions whenever using ! on them, the idea being that the + * message is delivered by traversing the various actor paths involved. + */ +@SerialVersionUID(1L) +private[akka] case class ActorSelectionMessage(msg: Any, elements: immutable.Iterable[SelectionPathElement]) + extends AutoReceivedMessage with PossiblyHarmful { + + def identifyRequest: Option[Identify] = msg match { + case x: Identify ⇒ Some(x) + case _ ⇒ None + } +} + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[akka] sealed trait SelectionPathElement + +/** + * INTERNAL API + */ +@SerialVersionUID(2L) +private[akka] case class SelectChildName(name: String) extends SelectionPathElement { + override def toString: String = name +} + +/** + * INTERNAL API + */ +@SerialVersionUID(2L) +private[akka] case class SelectChildPattern(patternStr: String) extends SelectionPathElement { + val pattern: Pattern = Helpers.makePattern(patternStr) + override def toString: String = patternStr +} + +/** + * INTERNAL API + */ +@SerialVersionUID(2L) +private[akka] case object SelectParent extends SelectionPathElement { + override def toString: String = ".." +} + /** * When [[ActorSelection#resolveOne]] can't identify the actor the * `Future` is completed with this failure. diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index ea5b54eedc..c39ae10f34 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.annotation.tailrec +import scala.collection.immutable import akka.actor.dungeon.ChildrenContainer import akka.event.Logging.Warning @@ -154,6 +155,14 @@ private[akka] class RepointableActorRef( } } else this + /** + * Method for looking up a single child beneath this actor. + * It is racy if called from the outside. + */ + def getSingleChild(name: String): InternalActorRef = lookup.getSingleChild(name) + + def children: immutable.Iterable[ActorRef] = lookup.childrenRefs.children + def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender) def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) @@ -204,6 +213,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, def parent: InternalActorRef = supervisor def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def getChildByName(name: String): Option[ChildRestartStats] = None + override def getSingleChild(name: String): InternalActorRef = Nobody def sendMessage(msg: Envelope): Unit = { if (lock.tryLock(timeout.length, timeout.unit)) { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index babbc71c1d..8d5c21212d 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -135,6 +135,22 @@ private[akka] trait Children { this: ActorCell ⇒ protected def getAllChildStats: immutable.Iterable[ChildRestartStats] = childrenRefs.stats + override def getSingleChild(name: String): InternalActorRef = + if (name.indexOf('#') == -1) { + // optimization for the non-uid case + getChildByName(name) match { + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef] + case _ ⇒ Nobody + } + } else { + val (childName, uid) = ActorCell.splitNameAndUid(name) + getChildByName(childName) match { + case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid ⇒ + crs.child.asInstanceOf[InternalActorRef] + case _ ⇒ Nobody + } + } + protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = { @tailrec def removeChild(ref: ActorRef): ChildrenContainer = { val c = childrenRefs diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 6f181e733f..22387f0b1f 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -23,7 +23,7 @@ akka { # Since com.google.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity - "akka.actor.SelectionPath" = akka-containers + "akka.actor.ActorSelectionMessage" = akka-containers "com.google.protobuf.GeneratedMessage" = proto "akka.remote.DaemonMsgCreate" = daemon-create } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 0301332955..89c52ea35a 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -74,7 +74,10 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor case msg: PossiblyHarmful if UntrustedMode ⇒ log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass) case msg: SystemMessage ⇒ l.sendSystemMessage(msg) - case msg ⇒ l.!(msg)(sender) + case sel: ActorSelectionMessage ⇒ + // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor + ActorSelection.deliverSelection(l, sender, sel) + case msg ⇒ l.!(msg)(sender) } case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 38291f0f7c..082c46b071 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -13,6 +13,7 @@ import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope import akka.util.Switch import akka.actor.RootActorPath +import akka.actor.ActorSelectionMessage import akka.actor.SelectParent import akka.actor.SelectChildName import akka.actor.SelectChildPattern @@ -120,18 +121,30 @@ private[akka] class RemoteSystemDaemon( } } - case SelectParent(m) ⇒ getParent.tell(m, sender) - - case s @ SelectChildName(name, m) ⇒ - getChild(s.allChildNames.iterator) match { - case Nobody ⇒ - s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) } - case child ⇒ - child.tell(s.wrappedMessage, sender) + case sel: ActorSelectionMessage ⇒ + val (concatenatedChildNames, m) = { + val iter = sel.elements.iterator + // find child elements, and the message to send, which is a remaining ActorSelectionMessage + // in case of SelectChildPattern, otherwise the the actual message of the selection + @tailrec def rec(acc: List[String]): (List[String], Any) = + if (iter.isEmpty) + (acc.reverse, sel.msg) + else { + iter.next() match { + case SelectChildName(name) ⇒ rec(name :: acc) + case SelectParent if acc.isEmpty ⇒ rec(acc) + case SelectParent ⇒ rec(acc.tail) + case pat: SelectChildPattern ⇒ (acc.reverse, sel.copy(elements = pat +: iter.toVector)) + } + } + rec(Nil) + } + getChild(concatenatedChildNames.iterator) match { + case Nobody ⇒ + sel.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) } + case child ⇒ + child.tell(m, sender) } - - case SelectChildPattern(p, m) ⇒ - log.error("SelectChildPattern not allowed in actorSelection of remote deployed actors") case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(this)) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala index cbdb96da35..fc6461a10a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala @@ -5,12 +5,12 @@ package akka.remote import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec +import akka.actor.ActorSelectionMessage import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider -import akka.actor.SelectChildName import akka.event.Logging import akka.routing.RouterEnvelope @@ -63,9 +63,9 @@ private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteM override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit = if (payloadBytes >= logFrameSizeExceeding) { val clazz = msg match { - case x: SelectChildName ⇒ x.wrappedMessage.getClass - case x: RouterEnvelope ⇒ x.message.getClass - case _ ⇒ msg.getClass + case x: ActorSelectionMessage ⇒ x.msg.getClass + case x: RouterEnvelope ⇒ x.message.getClass + case _ ⇒ msg.getClass } // 10% threshold until next log diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index ebda4517c8..d1c4fe5842 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -3,18 +3,17 @@ */ package akka.remote.serialization -import akka.serialization.{ SerializationExtension, Serializer } -import akka.actor._ -import scala.annotation.tailrec -import akka.remote.ContainerFormats -import akka.remote.ContainerFormats.SelectionEnvelope -import akka.remote.WireFormats.SerializedMessage +import scala.collection.immutable import com.google.protobuf.ByteString +import akka.actor.ActorSelectionMessage +import akka.actor.ExtendedActorSystem +import akka.actor.SelectChildName import akka.actor.SelectChildPattern import akka.actor.SelectParent -import akka.actor.SelectChildName -import scala.Some -import java.util.regex.Pattern +import akka.actor.SelectionPathElement +import akka.remote.ContainerFormats +import akka.serialization.SerializationExtension +import akka.serialization.Serializer class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serializer { @@ -23,28 +22,32 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serial def includeManifest: Boolean = false def toBinary(obj: AnyRef): Array[Byte] = obj match { - case path: SelectionPath ⇒ serializeSelectionPath(path, ContainerFormats.SelectionEnvelope.newBuilder()).build().toByteArray - case _ ⇒ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]") + case sel: ActorSelectionMessage ⇒ serializeSelection(sel) + case _ ⇒ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]") } import ContainerFormats.PatternType._ - @tailrec - private def serializeSelectionPath(path: Any, builder: SelectionEnvelope.Builder): SelectionEnvelope.Builder = path match { - case SelectChildName(name, next) ⇒ - serializeSelectionPath(next, builder.addPattern(buildPattern(Some(name), CHILD_NAME))) - case SelectChildPattern(pattern, next) ⇒ - serializeSelectionPath(next, builder.addPattern(buildPattern(Some(pattern.toString), CHILD_PATTERN))) - case SelectParent(next) ⇒ - serializeSelectionPath(next, builder.addPattern(buildPattern(None, PARENT))) - case message: AnyRef ⇒ - val serializer = SerializationExtension(system).findSerializerFor(message) - builder - .setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))) - .setSerializerId(serializer.identifier) - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) - builder + private def serializeSelection(sel: ActorSelectionMessage): Array[Byte] = { + val builder = ContainerFormats.SelectionEnvelope.newBuilder() + val message = sel.msg.asInstanceOf[AnyRef] + val serializer = SerializationExtension(system).findSerializerFor(message) + builder. + setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))). + setSerializerId(serializer.identifier) + if (serializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + + sel.elements.foreach { + case SelectChildName(name) ⇒ + builder.addPattern(buildPattern(Some(name), CHILD_NAME)) + case SelectChildPattern(patternStr) ⇒ + builder.addPattern(buildPattern(Some(patternStr), CHILD_PATTERN)) + case SelectParent ⇒ + builder.addPattern(buildPattern(None, PARENT)) + } + + builder.build().toByteArray } private def buildPattern(matcher: Option[String], tpe: ContainerFormats.PatternType): ContainerFormats.Selection.Builder = { @@ -61,19 +64,15 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serial if (selectionEnvelope.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](selectionEnvelope.getMessageManifest.toStringUtf8).get) else None).get - @tailrec - def reconstruct(remaining: Seq[ContainerFormats.Selection], nextLink: AnyRef): AnyRef = remaining match { - case Nil ⇒ nextLink - case sel :: tail ⇒ - val next = sel.getType match { - case CHILD_NAME ⇒ SelectChildName(sel.getMatcher, nextLink) - case CHILD_PATTERN ⇒ SelectChildPattern(Pattern.compile(sel.getMatcher), nextLink) - case PARENT ⇒ SelectParent(nextLink) - } - reconstruct(tail, next) - } - import scala.collection.JavaConverters._ - reconstruct(selectionEnvelope.getPatternList.asScala.toList.reverse, msg) + val elements: immutable.Iterable[SelectionPathElement] = selectionEnvelope.getPatternList.asScala.map { x ⇒ + x.getType match { + case CHILD_NAME ⇒ SelectChildName(x.getMatcher) + case CHILD_PATTERN ⇒ SelectChildPattern(x.getMatcher) + case PARENT ⇒ SelectParent + } + + }(collection.breakOut) + ActorSelectionMessage(msg, elements) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 91de2f5cfc..7b601a966d 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -384,20 +384,52 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsgType[ActorSelection] ! Identify(None) expectMsgType[ActorIdentity].ref.get must be theSameInstanceAs l - child ! Identify("idReq1") + grandchild ! ((Props[Echo1], "grandgrandchild")) + val grandgrandchild = expectMsgType[ActorRef] + + system.actorSelection("/user/looker2/child") ! Identify("idReq1") expectMsg(ActorIdentity("idReq1", Some(child))) + system.actorSelection(child.path) ! Identify("idReq2") + expectMsg(ActorIdentity("idReq2", Some(child))) + system.actorSelection("/user/looker2/*") ! Identify("idReq3") + expectMsg(ActorIdentity("idReq3", Some(child))) + + system.actorSelection("/user/looker2/child/grandchild") ! Identify("idReq4") + expectMsg(ActorIdentity("idReq4", Some(grandchild))) + system.actorSelection(child.path / "grandchild") ! Identify("idReq5") + expectMsg(ActorIdentity("idReq5", Some(grandchild))) + system.actorSelection("/user/looker2/*/grandchild") ! Identify("idReq6") + expectMsg(ActorIdentity("idReq6", Some(grandchild))) + system.actorSelection("/user/looker2/child/*") ! Identify("idReq7") + expectMsg(ActorIdentity("idReq7", Some(grandchild))) + system.actorSelection(child.path / "*") ! Identify("idReq8") + expectMsg(ActorIdentity("idReq8", Some(grandchild))) + + system.actorSelection("/user/looker2/child/grandchild/grandgrandchild") ! Identify("idReq9") + expectMsg(ActorIdentity("idReq9", Some(grandgrandchild))) + system.actorSelection(child.path / "grandchild" / "grandgrandchild") ! Identify("idReq10") + expectMsg(ActorIdentity("idReq10", Some(grandgrandchild))) + system.actorSelection("/user/looker2/child/*/grandgrandchild") ! Identify("idReq11") + expectMsg(ActorIdentity("idReq11", Some(grandgrandchild))) + system.actorSelection("/user/looker2/child/*/*") ! Identify("idReq12") + expectMsg(ActorIdentity("idReq12", Some(grandgrandchild))) + system.actorSelection(child.path / "*" / "grandgrandchild") ! Identify("idReq13") + expectMsg(ActorIdentity("idReq13", Some(grandgrandchild))) + + child ! Identify("idReq14") + expectMsg(ActorIdentity("idReq14", Some(child))) watch(child) child ! PoisonPill expectMsg("postStop") expectMsgType[Terminated].actor must be === child l ! ((Props[Echo1], "child")) val child2 = expectMsgType[ActorRef] - child2 ! Identify("idReq2") - expectMsg(ActorIdentity("idReq2", Some(child2))) - system.actorSelection(child.path) ! Identify("idReq3") - expectMsg(ActorIdentity("idReq3", Some(child2))) - child ! Identify("idReq4") - expectMsg(ActorIdentity("idReq4", None)) + child2 ! Identify("idReq15") + expectMsg(ActorIdentity("idReq15", Some(child2))) + system.actorSelection(child.path) ! Identify("idReq16") + expectMsg(ActorIdentity("idReq16", Some(child2))) + child ! Identify("idReq17") + expectMsg(ActorIdentity("idReq17", None)) child2 ! 55 expectMsg(55) diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala new file mode 100644 index 0000000000..dcf0ce31f8 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.remote.serialization + +import akka.serialization.SerializationExtension +import com.typesafe.config.ConfigFactory +import akka.testkit.AkkaSpec +import akka.actor.ActorSelectionMessage +import akka.actor.SelectChildName +import akka.actor.SelectParent +import akka.actor.SelectChildPattern +import akka.util.Helpers + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class MessageContainerSerializerSpec extends AkkaSpec { + + val ser = SerializationExtension(system) + + "DaemonMsgCreateSerializer" must { + + "resolve serializer for ActorSelectionMessage" in { + ser.serializerFor(classOf[ActorSelectionMessage]).getClass must be(classOf[MessageContainerSerializer]) + } + + "serialize and de-serialize ActorSelectionMessage" in { + verifySerialization(ActorSelectionMessage("hello", Vector( + SelectChildName("user"), SelectChildName("a"), SelectChildName("b"), SelectParent, + SelectChildPattern("*"), SelectChildName("c")))) + } + + def verifySerialization(msg: AnyRef): Unit = { + ser.deserialize(ser.serialize(msg).get, msg.getClass).get must be(msg) + } + + } +} +