Merge pull request #1813 from akka/wip-3073-faster-actorselection-patriknw
=act,rem #3073 Make ActorSelection faster
This commit is contained in:
commit
032b542c19
13 changed files with 306 additions and 171 deletions
|
|
@ -124,59 +124,6 @@ case object ReceiveTimeout extends ReceiveTimeout {
|
|||
def getInstance = this
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* ActorRefFactory.actorSelection returns a special ref which sends these
|
||||
* nested path descriptions whenever using ! on them, the idea being that the
|
||||
* message is delivered by active routing of the various actors involved.
|
||||
*/
|
||||
private[akka] sealed trait SelectionPath extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] case class SelectChildName(name: String, next: Any) extends SelectionPath {
|
||||
|
||||
def wrappedMessage: Any = {
|
||||
@tailrec def rec(nx: Any): Any = nx match {
|
||||
case SelectChildName(_, n) ⇒ rec(n)
|
||||
case SelectChildPattern(_, n) ⇒ rec(n)
|
||||
case SelectParent(n) ⇒ rec(n)
|
||||
case x ⇒ x
|
||||
}
|
||||
rec(next)
|
||||
}
|
||||
|
||||
def identifyRequest: Option[Identify] = wrappedMessage match {
|
||||
case x: Identify ⇒ Some(x)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
def allChildNames: immutable.Iterable[String] = {
|
||||
@tailrec def rec(nx: Any, acc: List[String]): List[String] = nx match {
|
||||
case SelectChildName(name, n) ⇒ rec(n, name :: acc)
|
||||
case SelectChildPattern(_, n) ⇒ throw new IllegalArgumentException("SelectChildPattern not allowed")
|
||||
case SelectParent(n) if acc.isEmpty ⇒ rec(n, acc)
|
||||
case SelectParent(n) ⇒ rec(n, acc.tail)
|
||||
case _ ⇒ acc
|
||||
}
|
||||
rec(this, Nil).reverse
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] case class SelectChildPattern(pattern: Pattern, next: Any) extends SelectionPath
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] case class SelectParent(next: Any) extends SelectionPath
|
||||
|
||||
/**
|
||||
* IllegalActorStateException is thrown when a core invariant in the Actor implementation has been violated.
|
||||
* For instance, if you try to create an Actor that doesn't extend Actor.
|
||||
|
|
|
|||
|
|
@ -243,6 +243,12 @@ private[akka] trait Cell {
|
|||
*/
|
||||
def getChildByName(name: String): Option[ChildStats]
|
||||
|
||||
/**
|
||||
* Method for looking up a single child beneath this actor.
|
||||
* It is racy if called from the outside.
|
||||
*/
|
||||
def getSingleChild(name: String): InternalActorRef
|
||||
|
||||
/**
|
||||
* Enqueue a message to be sent to the actor; may or may not actually
|
||||
* schedule the actor to run, depending on which type of cell it is.
|
||||
|
|
@ -471,30 +477,17 @@ private[akka] class ActorCell(
|
|||
case AddressTerminated(address) ⇒ addressTerminated(address)
|
||||
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||
case PoisonPill ⇒ self.stop()
|
||||
case SelectParent(m) ⇒
|
||||
if (self == system.provider.rootGuardian) self.tell(m, msg.sender)
|
||||
else parent.tell(m, msg.sender)
|
||||
case s @ SelectChildName(name, m) ⇒
|
||||
def selectChild(): Unit = {
|
||||
getChildByName(name) match {
|
||||
case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender)
|
||||
case _ ⇒
|
||||
s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) }
|
||||
}
|
||||
}
|
||||
// need this special case because of extraNames handled by rootGuardian
|
||||
if (self == system.provider.rootGuardian) {
|
||||
self.asInstanceOf[LocalActorRef].getSingleChild(name) match {
|
||||
case Nobody ⇒ selectChild()
|
||||
case child ⇒ child.tell(m, msg.sender)
|
||||
}
|
||||
} else
|
||||
selectChild()
|
||||
case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
|
||||
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self))
|
||||
case sel: ActorSelectionMessage ⇒ receiveSelection(sel)
|
||||
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self))
|
||||
}
|
||||
}
|
||||
|
||||
private def receiveSelection(sel: ActorSelectionMessage): Unit =
|
||||
if (sel.elements.isEmpty)
|
||||
invoke(Envelope(sel.msg, sender, system))
|
||||
else
|
||||
ActorSelection.deliverSelection(self, sender, sel)
|
||||
|
||||
final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.dispatch._
|
||||
import akka.dispatch.sysmsg._
|
||||
import java.lang.{ UnsupportedOperationException, IllegalStateException }
|
||||
|
|
@ -266,6 +267,8 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
|||
*/
|
||||
private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒
|
||||
def underlying: Cell
|
||||
def children: immutable.Iterable[ActorRef]
|
||||
def getSingleChild(name: String): InternalActorRef
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -343,19 +346,14 @@ private[akka] class LocalActorRef private[akka] (
|
|||
|
||||
override def provider: ActorRefProvider = actorCell.provider
|
||||
|
||||
def children: immutable.Iterable[ActorRef] = actorCell.children
|
||||
|
||||
/**
|
||||
* Method for looking up a single child beneath this actor. Override in order
|
||||
* to inject “synthetic” actor paths like “/temp”.
|
||||
* It is racy if called from the outside.
|
||||
*/
|
||||
def getSingleChild(name: String): InternalActorRef = {
|
||||
val (childName, uid) = ActorCell.splitNameAndUid(name)
|
||||
actorCell.getChildByName(childName) match {
|
||||
case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid ⇒
|
||||
crs.child.asInstanceOf[InternalActorRef]
|
||||
case _ ⇒ Nobody
|
||||
}
|
||||
}
|
||||
def getSingleChild(name: String): InternalActorRef = actorCell.getSingleChild(name)
|
||||
|
||||
override def getChild(names: Iterator[String]): InternalActorRef = {
|
||||
/*
|
||||
|
|
@ -507,11 +505,11 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
|
|||
case Identify(messageId) ⇒
|
||||
sender ! ActorIdentity(messageId, None)
|
||||
true
|
||||
case s: SelectChildName ⇒
|
||||
s.identifyRequest match {
|
||||
case sel: ActorSelectionMessage ⇒
|
||||
sel.identifyRequest match {
|
||||
case Some(identify) ⇒ sender ! ActorIdentity(identify.messageId, None)
|
||||
case None ⇒
|
||||
eventStream.publish(DeadLetter(s.wrappedMessage, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
||||
eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
||||
}
|
||||
true
|
||||
case _ ⇒ false
|
||||
|
|
|
|||
|
|
@ -26,27 +26,16 @@ import akka.dispatch.ExecutionContexts
|
|||
abstract class ActorSelection extends Serializable {
|
||||
this: ScalaActorSelection ⇒
|
||||
|
||||
import ActorSelection.PatternHolder
|
||||
|
||||
protected[akka] val anchor: ActorRef
|
||||
|
||||
protected val path: immutable.IndexedSeq[AnyRef]
|
||||
protected val path: immutable.IndexedSeq[SelectionPathElement]
|
||||
|
||||
@deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.2")
|
||||
def tell(msg: Any): Unit = tell(msg, Actor.noSender)
|
||||
|
||||
def tell(msg: Any, sender: ActorRef): Unit = {
|
||||
@tailrec def toMessage(msg: Any, path: immutable.IndexedSeq[AnyRef], index: Int): Any =
|
||||
if (index < 0) msg
|
||||
else toMessage(
|
||||
path(index) match {
|
||||
case ".." ⇒ SelectParent(msg)
|
||||
case s: String ⇒ SelectChildName(s, msg)
|
||||
case p: PatternHolder ⇒ SelectChildPattern(p.pat, msg)
|
||||
}, path, index - 1)
|
||||
|
||||
anchor.tell(toMessage(msg, path, path.length - 1), sender)
|
||||
}
|
||||
def tell(msg: Any, sender: ActorRef): Unit =
|
||||
ActorSelection.deliverSelection(anchor.asInstanceOf[InternalActorRef], sender,
|
||||
ActorSelectionMessage(msg, path))
|
||||
|
||||
/**
|
||||
* Resolve the [[ActorRef]] matching this selection.
|
||||
|
|
@ -108,11 +97,6 @@ object ActorSelection {
|
|||
//This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection
|
||||
implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection]
|
||||
|
||||
@SerialVersionUID(1L) private case class PatternHolder(str: String) {
|
||||
val pat = Helpers.makePattern(str)
|
||||
override def toString = str
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an ActorSelection from the given string representing a path
|
||||
* relative to the given target. This operation has to create all the
|
||||
|
|
@ -128,14 +112,68 @@ object ActorSelection {
|
|||
* intention is to send messages frequently.
|
||||
*/
|
||||
def apply(anchorRef: ActorRef, elements: Iterable[String]): ActorSelection = {
|
||||
val compiled: immutable.IndexedSeq[AnyRef] = elements.collect({
|
||||
case x if !x.isEmpty ⇒ if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) PatternHolder(x) else x
|
||||
val compiled: immutable.IndexedSeq[SelectionPathElement] = elements.collect({
|
||||
case x if !x.isEmpty ⇒
|
||||
if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) SelectChildPattern(x)
|
||||
else if (x == "..") SelectParent
|
||||
else SelectChildName(x)
|
||||
})(scala.collection.breakOut)
|
||||
new ActorSelection with ScalaActorSelection {
|
||||
override val anchor = anchorRef
|
||||
override val path = compiled
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* The receive logic for ActorSelectionMessage. The idea is to recursively descend as far as possible
|
||||
* with local refs and hand over to that “foreign” child when we encounter it.
|
||||
*/
|
||||
private[akka] def deliverSelection(anchor: InternalActorRef, sender: ActorRef, sel: ActorSelectionMessage): Unit =
|
||||
if (sel.elements.isEmpty)
|
||||
anchor.tell(sel.msg, sender)
|
||||
else {
|
||||
val iter = sel.elements.iterator
|
||||
|
||||
@tailrec def rec(ref: InternalActorRef): Unit = {
|
||||
ref match {
|
||||
case refWithCell: ActorRefWithCell ⇒
|
||||
iter.next() match {
|
||||
case SelectParent ⇒
|
||||
val parent = ref.getParent
|
||||
if (iter.isEmpty)
|
||||
parent.tell(sel.msg, sender)
|
||||
else
|
||||
rec(parent)
|
||||
case SelectChildName(name) ⇒
|
||||
val child = refWithCell.getSingleChild(name)
|
||||
if (child == Nobody)
|
||||
sel.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) }
|
||||
else if (iter.isEmpty)
|
||||
child.tell(sel.msg, sender)
|
||||
else
|
||||
rec(child)
|
||||
case p: SelectChildPattern ⇒
|
||||
// fan-out when there is a wildcard
|
||||
val chldr = refWithCell.children
|
||||
if (iter.isEmpty)
|
||||
for (c ← chldr if p.pattern.matcher(c.path.name).matches)
|
||||
c.tell(sel.msg, sender)
|
||||
else {
|
||||
val m = sel.copy(elements = iter.toVector)
|
||||
for (c ← chldr if p.pattern.matcher(c.path.name).matches)
|
||||
deliverSelection(c.asInstanceOf[InternalActorRef], sender, m)
|
||||
}
|
||||
}
|
||||
|
||||
case _ ⇒
|
||||
// foreign ref, continue by sending ActorSelectionMessage to it with remaining elements
|
||||
ref.tell(sel.copy(elements = iter.toVector), sender)
|
||||
}
|
||||
}
|
||||
|
||||
rec(anchor)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -148,6 +186,53 @@ trait ScalaActorSelection {
|
|||
def !(msg: Any)(implicit sender: ActorRef = Actor.noSender) = tell(msg, sender)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* ActorRefFactory.actorSelection returns a ActorSelection which sends these
|
||||
* nested path descriptions whenever using ! on them, the idea being that the
|
||||
* message is delivered by traversing the various actor paths involved.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] case class ActorSelectionMessage(msg: Any, elements: immutable.Iterable[SelectionPathElement])
|
||||
extends AutoReceivedMessage with PossiblyHarmful {
|
||||
|
||||
def identifyRequest: Option[Identify] = msg match {
|
||||
case x: Identify ⇒ Some(x)
|
||||
case _ ⇒ None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] sealed trait SelectionPathElement
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(2L)
|
||||
private[akka] case class SelectChildName(name: String) extends SelectionPathElement {
|
||||
override def toString: String = name
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(2L)
|
||||
private[akka] case class SelectChildPattern(patternStr: String) extends SelectionPathElement {
|
||||
val pattern: Pattern = Helpers.makePattern(patternStr)
|
||||
override def toString: String = patternStr
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(2L)
|
||||
private[akka] case object SelectParent extends SelectionPathElement {
|
||||
override def toString: String = ".."
|
||||
}
|
||||
|
||||
/**
|
||||
* When [[ActorSelection#resolveOne]] can't identify the actor the
|
||||
* `Future` is completed with this failure.
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit
|
|||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
|
||||
import akka.actor.dungeon.ChildrenContainer
|
||||
import akka.event.Logging.Warning
|
||||
|
|
@ -154,6 +155,14 @@ private[akka] class RepointableActorRef(
|
|||
}
|
||||
} else this
|
||||
|
||||
/**
|
||||
* Method for looking up a single child beneath this actor.
|
||||
* It is racy if called from the outside.
|
||||
*/
|
||||
def getSingleChild(name: String): InternalActorRef = lookup.getSingleChild(name)
|
||||
|
||||
def children: immutable.Iterable[ActorRef] = lookup.childrenRefs.children
|
||||
|
||||
def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender)
|
||||
|
||||
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
|
||||
|
|
@ -204,6 +213,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
|
|||
def parent: InternalActorRef = supervisor
|
||||
def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer
|
||||
def getChildByName(name: String): Option[ChildRestartStats] = None
|
||||
override def getSingleChild(name: String): InternalActorRef = Nobody
|
||||
|
||||
def sendMessage(msg: Envelope): Unit = {
|
||||
if (lock.tryLock(timeout.length, timeout.unit)) {
|
||||
|
|
|
|||
|
|
@ -135,6 +135,22 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
|
||||
protected def getAllChildStats: immutable.Iterable[ChildRestartStats] = childrenRefs.stats
|
||||
|
||||
override def getSingleChild(name: String): InternalActorRef =
|
||||
if (name.indexOf('#') == -1) {
|
||||
// optimization for the non-uid case
|
||||
getChildByName(name) match {
|
||||
case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef]
|
||||
case _ ⇒ Nobody
|
||||
}
|
||||
} else {
|
||||
val (childName, uid) = ActorCell.splitNameAndUid(name)
|
||||
getChildByName(childName) match {
|
||||
case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid ⇒
|
||||
crs.child.asInstanceOf[InternalActorRef]
|
||||
case _ ⇒ Nobody
|
||||
}
|
||||
}
|
||||
|
||||
protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = {
|
||||
@tailrec def removeChild(ref: ActorRef): ChildrenContainer = {
|
||||
val c = childrenRefs
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ akka {
|
|||
# Since com.google.protobuf.Message does not extend Serializable but
|
||||
# GeneratedMessage does, need to use the more specific one here in order
|
||||
# to avoid ambiguity
|
||||
"akka.actor.SelectionPath" = akka-containers
|
||||
"akka.actor.ActorSelectionMessage" = akka-containers
|
||||
"com.google.protobuf.GeneratedMessage" = proto
|
||||
"akka.remote.DaemonMsgCreate" = daemon-create
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,10 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor
|
|||
case msg: PossiblyHarmful if UntrustedMode ⇒
|
||||
log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass)
|
||||
case msg: SystemMessage ⇒ l.sendSystemMessage(msg)
|
||||
case msg ⇒ l.!(msg)(sender)
|
||||
case sel: ActorSelectionMessage ⇒
|
||||
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
|
||||
ActorSelection.deliverSelection(l, sender, sel)
|
||||
case msg ⇒ l.!(msg)(sender)
|
||||
}
|
||||
|
||||
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import akka.actor.ActorRefWithCell
|
|||
import akka.actor.ActorRefScope
|
||||
import akka.util.Switch
|
||||
import akka.actor.RootActorPath
|
||||
import akka.actor.ActorSelectionMessage
|
||||
import akka.actor.SelectParent
|
||||
import akka.actor.SelectChildName
|
||||
import akka.actor.SelectChildPattern
|
||||
|
|
@ -120,18 +121,30 @@ private[akka] class RemoteSystemDaemon(
|
|||
}
|
||||
}
|
||||
|
||||
case SelectParent(m) ⇒ getParent.tell(m, sender)
|
||||
|
||||
case s @ SelectChildName(name, m) ⇒
|
||||
getChild(s.allChildNames.iterator) match {
|
||||
case Nobody ⇒
|
||||
s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) }
|
||||
case child ⇒
|
||||
child.tell(s.wrappedMessage, sender)
|
||||
case sel: ActorSelectionMessage ⇒
|
||||
val (concatenatedChildNames, m) = {
|
||||
val iter = sel.elements.iterator
|
||||
// find child elements, and the message to send, which is a remaining ActorSelectionMessage
|
||||
// in case of SelectChildPattern, otherwise the the actual message of the selection
|
||||
@tailrec def rec(acc: List[String]): (List[String], Any) =
|
||||
if (iter.isEmpty)
|
||||
(acc.reverse, sel.msg)
|
||||
else {
|
||||
iter.next() match {
|
||||
case SelectChildName(name) ⇒ rec(name :: acc)
|
||||
case SelectParent if acc.isEmpty ⇒ rec(acc)
|
||||
case SelectParent ⇒ rec(acc.tail)
|
||||
case pat: SelectChildPattern ⇒ (acc.reverse, sel.copy(elements = pat +: iter.toVector))
|
||||
}
|
||||
}
|
||||
rec(Nil)
|
||||
}
|
||||
getChild(concatenatedChildNames.iterator) match {
|
||||
case Nobody ⇒
|
||||
sel.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) }
|
||||
case child ⇒
|
||||
child.tell(m, sender)
|
||||
}
|
||||
|
||||
case SelectChildPattern(p, m) ⇒
|
||||
log.error("SelectChildPattern not allowed in actorSelection of remote deployed actors")
|
||||
|
||||
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(this))
|
||||
|
||||
|
|
|
|||
|
|
@ -5,12 +5,12 @@ package akka.remote
|
|||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.ActorSelectionMessage
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Extension
|
||||
import akka.actor.ExtensionId
|
||||
import akka.actor.ExtensionIdProvider
|
||||
import akka.actor.SelectChildName
|
||||
import akka.event.Logging
|
||||
import akka.routing.RouterEnvelope
|
||||
|
||||
|
|
@ -63,9 +63,9 @@ private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteM
|
|||
override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit =
|
||||
if (payloadBytes >= logFrameSizeExceeding) {
|
||||
val clazz = msg match {
|
||||
case x: SelectChildName ⇒ x.wrappedMessage.getClass
|
||||
case x: RouterEnvelope ⇒ x.message.getClass
|
||||
case _ ⇒ msg.getClass
|
||||
case x: ActorSelectionMessage ⇒ x.msg.getClass
|
||||
case x: RouterEnvelope ⇒ x.message.getClass
|
||||
case _ ⇒ msg.getClass
|
||||
}
|
||||
|
||||
// 10% threshold until next log
|
||||
|
|
|
|||
|
|
@ -3,18 +3,17 @@
|
|||
*/
|
||||
package akka.remote.serialization
|
||||
|
||||
import akka.serialization.{ SerializationExtension, Serializer }
|
||||
import akka.actor._
|
||||
import scala.annotation.tailrec
|
||||
import akka.remote.ContainerFormats
|
||||
import akka.remote.ContainerFormats.SelectionEnvelope
|
||||
import akka.remote.WireFormats.SerializedMessage
|
||||
import scala.collection.immutable
|
||||
import com.google.protobuf.ByteString
|
||||
import akka.actor.ActorSelectionMessage
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.SelectChildName
|
||||
import akka.actor.SelectChildPattern
|
||||
import akka.actor.SelectParent
|
||||
import akka.actor.SelectChildName
|
||||
import scala.Some
|
||||
import java.util.regex.Pattern
|
||||
import akka.actor.SelectionPathElement
|
||||
import akka.remote.ContainerFormats
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.serialization.Serializer
|
||||
|
||||
class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||
|
||||
|
|
@ -23,28 +22,32 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serial
|
|||
def includeManifest: Boolean = false
|
||||
|
||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case path: SelectionPath ⇒ serializeSelectionPath(path, ContainerFormats.SelectionEnvelope.newBuilder()).build().toByteArray
|
||||
case _ ⇒ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]")
|
||||
case sel: ActorSelectionMessage ⇒ serializeSelection(sel)
|
||||
case _ ⇒ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]")
|
||||
}
|
||||
|
||||
import ContainerFormats.PatternType._
|
||||
|
||||
@tailrec
|
||||
private def serializeSelectionPath(path: Any, builder: SelectionEnvelope.Builder): SelectionEnvelope.Builder = path match {
|
||||
case SelectChildName(name, next) ⇒
|
||||
serializeSelectionPath(next, builder.addPattern(buildPattern(Some(name), CHILD_NAME)))
|
||||
case SelectChildPattern(pattern, next) ⇒
|
||||
serializeSelectionPath(next, builder.addPattern(buildPattern(Some(pattern.toString), CHILD_PATTERN)))
|
||||
case SelectParent(next) ⇒
|
||||
serializeSelectionPath(next, builder.addPattern(buildPattern(None, PARENT)))
|
||||
case message: AnyRef ⇒
|
||||
val serializer = SerializationExtension(system).findSerializerFor(message)
|
||||
builder
|
||||
.setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message)))
|
||||
.setSerializerId(serializer.identifier)
|
||||
if (serializer.includeManifest)
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
|
||||
builder
|
||||
private def serializeSelection(sel: ActorSelectionMessage): Array[Byte] = {
|
||||
val builder = ContainerFormats.SelectionEnvelope.newBuilder()
|
||||
val message = sel.msg.asInstanceOf[AnyRef]
|
||||
val serializer = SerializationExtension(system).findSerializerFor(message)
|
||||
builder.
|
||||
setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))).
|
||||
setSerializerId(serializer.identifier)
|
||||
if (serializer.includeManifest)
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
|
||||
|
||||
sel.elements.foreach {
|
||||
case SelectChildName(name) ⇒
|
||||
builder.addPattern(buildPattern(Some(name), CHILD_NAME))
|
||||
case SelectChildPattern(patternStr) ⇒
|
||||
builder.addPattern(buildPattern(Some(patternStr), CHILD_PATTERN))
|
||||
case SelectParent ⇒
|
||||
builder.addPattern(buildPattern(None, PARENT))
|
||||
}
|
||||
|
||||
builder.build().toByteArray
|
||||
}
|
||||
|
||||
private def buildPattern(matcher: Option[String], tpe: ContainerFormats.PatternType): ContainerFormats.Selection.Builder = {
|
||||
|
|
@ -61,19 +64,15 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serial
|
|||
if (selectionEnvelope.hasMessageManifest)
|
||||
Some(system.dynamicAccess.getClassFor[AnyRef](selectionEnvelope.getMessageManifest.toStringUtf8).get) else None).get
|
||||
|
||||
@tailrec
|
||||
def reconstruct(remaining: Seq[ContainerFormats.Selection], nextLink: AnyRef): AnyRef = remaining match {
|
||||
case Nil ⇒ nextLink
|
||||
case sel :: tail ⇒
|
||||
val next = sel.getType match {
|
||||
case CHILD_NAME ⇒ SelectChildName(sel.getMatcher, nextLink)
|
||||
case CHILD_PATTERN ⇒ SelectChildPattern(Pattern.compile(sel.getMatcher), nextLink)
|
||||
case PARENT ⇒ SelectParent(nextLink)
|
||||
}
|
||||
reconstruct(tail, next)
|
||||
}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
reconstruct(selectionEnvelope.getPatternList.asScala.toList.reverse, msg)
|
||||
val elements: immutable.Iterable[SelectionPathElement] = selectionEnvelope.getPatternList.asScala.map { x ⇒
|
||||
x.getType match {
|
||||
case CHILD_NAME ⇒ SelectChildName(x.getMatcher)
|
||||
case CHILD_PATTERN ⇒ SelectChildPattern(x.getMatcher)
|
||||
case PARENT ⇒ SelectParent
|
||||
}
|
||||
|
||||
}(collection.breakOut)
|
||||
ActorSelectionMessage(msg, elements)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -384,20 +384,52 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
expectMsgType[ActorSelection] ! Identify(None)
|
||||
expectMsgType[ActorIdentity].ref.get must be theSameInstanceAs l
|
||||
|
||||
child ! Identify("idReq1")
|
||||
grandchild ! ((Props[Echo1], "grandgrandchild"))
|
||||
val grandgrandchild = expectMsgType[ActorRef]
|
||||
|
||||
system.actorSelection("/user/looker2/child") ! Identify("idReq1")
|
||||
expectMsg(ActorIdentity("idReq1", Some(child)))
|
||||
system.actorSelection(child.path) ! Identify("idReq2")
|
||||
expectMsg(ActorIdentity("idReq2", Some(child)))
|
||||
system.actorSelection("/user/looker2/*") ! Identify("idReq3")
|
||||
expectMsg(ActorIdentity("idReq3", Some(child)))
|
||||
|
||||
system.actorSelection("/user/looker2/child/grandchild") ! Identify("idReq4")
|
||||
expectMsg(ActorIdentity("idReq4", Some(grandchild)))
|
||||
system.actorSelection(child.path / "grandchild") ! Identify("idReq5")
|
||||
expectMsg(ActorIdentity("idReq5", Some(grandchild)))
|
||||
system.actorSelection("/user/looker2/*/grandchild") ! Identify("idReq6")
|
||||
expectMsg(ActorIdentity("idReq6", Some(grandchild)))
|
||||
system.actorSelection("/user/looker2/child/*") ! Identify("idReq7")
|
||||
expectMsg(ActorIdentity("idReq7", Some(grandchild)))
|
||||
system.actorSelection(child.path / "*") ! Identify("idReq8")
|
||||
expectMsg(ActorIdentity("idReq8", Some(grandchild)))
|
||||
|
||||
system.actorSelection("/user/looker2/child/grandchild/grandgrandchild") ! Identify("idReq9")
|
||||
expectMsg(ActorIdentity("idReq9", Some(grandgrandchild)))
|
||||
system.actorSelection(child.path / "grandchild" / "grandgrandchild") ! Identify("idReq10")
|
||||
expectMsg(ActorIdentity("idReq10", Some(grandgrandchild)))
|
||||
system.actorSelection("/user/looker2/child/*/grandgrandchild") ! Identify("idReq11")
|
||||
expectMsg(ActorIdentity("idReq11", Some(grandgrandchild)))
|
||||
system.actorSelection("/user/looker2/child/*/*") ! Identify("idReq12")
|
||||
expectMsg(ActorIdentity("idReq12", Some(grandgrandchild)))
|
||||
system.actorSelection(child.path / "*" / "grandgrandchild") ! Identify("idReq13")
|
||||
expectMsg(ActorIdentity("idReq13", Some(grandgrandchild)))
|
||||
|
||||
child ! Identify("idReq14")
|
||||
expectMsg(ActorIdentity("idReq14", Some(child)))
|
||||
watch(child)
|
||||
child ! PoisonPill
|
||||
expectMsg("postStop")
|
||||
expectMsgType[Terminated].actor must be === child
|
||||
l ! ((Props[Echo1], "child"))
|
||||
val child2 = expectMsgType[ActorRef]
|
||||
child2 ! Identify("idReq2")
|
||||
expectMsg(ActorIdentity("idReq2", Some(child2)))
|
||||
system.actorSelection(child.path) ! Identify("idReq3")
|
||||
expectMsg(ActorIdentity("idReq3", Some(child2)))
|
||||
child ! Identify("idReq4")
|
||||
expectMsg(ActorIdentity("idReq4", None))
|
||||
child2 ! Identify("idReq15")
|
||||
expectMsg(ActorIdentity("idReq15", Some(child2)))
|
||||
system.actorSelection(child.path) ! Identify("idReq16")
|
||||
expectMsg(ActorIdentity("idReq16", Some(child2)))
|
||||
child ! Identify("idReq17")
|
||||
expectMsg(ActorIdentity("idReq17", None))
|
||||
|
||||
child2 ! 55
|
||||
expectMsg(55)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote.serialization
|
||||
|
||||
import akka.serialization.SerializationExtension
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.ActorSelectionMessage
|
||||
import akka.actor.SelectChildName
|
||||
import akka.actor.SelectParent
|
||||
import akka.actor.SelectChildPattern
|
||||
import akka.util.Helpers
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class MessageContainerSerializerSpec extends AkkaSpec {
|
||||
|
||||
val ser = SerializationExtension(system)
|
||||
|
||||
"DaemonMsgCreateSerializer" must {
|
||||
|
||||
"resolve serializer for ActorSelectionMessage" in {
|
||||
ser.serializerFor(classOf[ActorSelectionMessage]).getClass must be(classOf[MessageContainerSerializer])
|
||||
}
|
||||
|
||||
"serialize and de-serialize ActorSelectionMessage" in {
|
||||
verifySerialization(ActorSelectionMessage("hello", Vector(
|
||||
SelectChildName("user"), SelectChildName("a"), SelectChildName("b"), SelectParent,
|
||||
SelectChildPattern("*"), SelectChildName("c"))))
|
||||
}
|
||||
|
||||
def verifySerialization(msg: AnyRef): Unit = {
|
||||
ser.deserialize(ser.serialize(msg).get, msg.getClass).get must be(msg)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue