implement coherent actorFor look-up
- look-up of all actor paths in the system, even “synthetic” ones like “/temp” - look-up by full URI (akka://bla/...), absolute or relative path - look-up by ActorPath - look-up by path elements - look-up relative to context where applicable, supporting ".." - proper management of AskActorRef Have a look at ActorLookupSpec to see what it can do.
This commit is contained in:
parent
a3e6fca530
commit
79e5c5d0d1
10 changed files with 406 additions and 37 deletions
229
akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala
Normal file
229
akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala
Normal file
|
|
@ -0,0 +1,229 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.actor
|
||||||
|
|
||||||
|
import akka.testkit._
|
||||||
|
|
||||||
|
object ActorLookupSpec {
|
||||||
|
|
||||||
|
case class Create(child: String)
|
||||||
|
|
||||||
|
trait Query
|
||||||
|
case class LookupElems(path: Iterable[String]) extends Query
|
||||||
|
case class LookupString(path: String) extends Query
|
||||||
|
case class LookupPath(path: ActorPath) extends Query
|
||||||
|
case class GetSender(to: ActorRef) extends Query
|
||||||
|
|
||||||
|
val p = Props[Node]
|
||||||
|
|
||||||
|
class Node extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case Create(name) ⇒ sender ! context.actorOf(p, name)
|
||||||
|
case LookupElems(path) ⇒ sender ! context.actorFor(path)
|
||||||
|
case LookupString(path) ⇒ sender ! context.actorFor(path)
|
||||||
|
case LookupPath(path) ⇒ sender ! context.actorFor(path)
|
||||||
|
case GetSender(ref) ⇒ ref ! sender
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
class ActorLookupSpec extends AkkaSpec {
|
||||||
|
import ActorLookupSpec._
|
||||||
|
|
||||||
|
val c1 = system.actorOf(p, "c1")
|
||||||
|
val c2 = system.actorOf(p, "c2")
|
||||||
|
val c21 = (c2 ? Create("c21")).as[ActorRef].get
|
||||||
|
|
||||||
|
val user = system.asInstanceOf[ActorSystemImpl].guardian
|
||||||
|
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
|
||||||
|
val root = system.asInstanceOf[ActorSystemImpl].lookupRoot
|
||||||
|
|
||||||
|
"An ActorSystem" must {
|
||||||
|
|
||||||
|
"find actors by looking up their path" in {
|
||||||
|
system.actorFor(c1.path) must be === c1
|
||||||
|
system.actorFor(c2.path) must be === c2
|
||||||
|
system.actorFor(c21.path) must be === c21
|
||||||
|
system.actorFor(system / "c1") must be === c1
|
||||||
|
system.actorFor(system / "c2") must be === c2
|
||||||
|
system.actorFor(system / "c2" / "c21") must be === c21
|
||||||
|
system.actorFor(system / Seq("c2", "c21")) must be === c21
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their string representation" in {
|
||||||
|
system.actorFor(c1.path.toString) must be === c1
|
||||||
|
system.actorFor(c2.path.toString) must be === c2
|
||||||
|
system.actorFor(c21.path.toString) must be === c21
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their root-anchored relative path" in {
|
||||||
|
system.actorFor(c1.path.pathElements.mkString("/", "/", "")) must be === c1
|
||||||
|
system.actorFor(c2.path.pathElements.mkString("/", "/", "")) must be === c2
|
||||||
|
system.actorFor(c21.path.pathElements.mkString("/", "/", "")) must be === c21
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their relative path" in {
|
||||||
|
system.actorFor(c1.path.pathElements.mkString("/")) must be === c1
|
||||||
|
system.actorFor(c2.path.pathElements.mkString("/")) must be === c2
|
||||||
|
system.actorFor(c21.path.pathElements.mkString("/")) must be === c21
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their path elements" in {
|
||||||
|
system.actorFor(c1.path.pathElements) must be === c1
|
||||||
|
system.actorFor(c2.path.pathElements) must be === c2
|
||||||
|
system.actorFor(c21.path.pathElements) must be === c21
|
||||||
|
}
|
||||||
|
|
||||||
|
"find system-generated actors" in {
|
||||||
|
system.actorFor("/user") must be === user
|
||||||
|
system.actorFor("/null") must be === system.deadLetters
|
||||||
|
system.actorFor("/system") must be === syst
|
||||||
|
system.actorFor(syst.path) must be === syst
|
||||||
|
system.actorFor(syst.path.toString) must be === syst
|
||||||
|
system.actorFor("/") must be === root
|
||||||
|
system.actorFor("..") must be === root
|
||||||
|
system.actorFor(root.path) must be === root
|
||||||
|
system.actorFor(root.path.toString) must be === root
|
||||||
|
system.actorFor("user") must be === user
|
||||||
|
system.actorFor("null") must be === system.deadLetters
|
||||||
|
system.actorFor("system") must be === syst
|
||||||
|
system.actorFor("user/") must be === user
|
||||||
|
system.actorFor("null/") must be === system.deadLetters
|
||||||
|
system.actorFor("system/") must be === syst
|
||||||
|
}
|
||||||
|
|
||||||
|
"return deadLetters for non-existing paths" in {
|
||||||
|
system.actorFor("a/b/c") must be === system.deadLetters
|
||||||
|
system.actorFor("") must be === system.deadLetters
|
||||||
|
system.actorFor("akka://all-systems/Nobody") must be === system.deadLetters
|
||||||
|
system.actorFor("akka://all-systems/user") must be === system.deadLetters
|
||||||
|
system.actorFor(system / "hallo") must be === system.deadLetters
|
||||||
|
system.actorFor(Seq()) must be === system.deadLetters
|
||||||
|
system.actorFor(Seq("a")) must be === system.deadLetters
|
||||||
|
}
|
||||||
|
|
||||||
|
"find temporary actors" in {
|
||||||
|
val f = c1 ? GetSender(testActor)
|
||||||
|
val a = expectMsgType[ActorRef]
|
||||||
|
a.path.pathElements.head must be === "temp"
|
||||||
|
system.actorFor(a.path) must be === a
|
||||||
|
system.actorFor(a.path.toString) must be === a
|
||||||
|
system.actorFor(a.path.pathElements) must be === a
|
||||||
|
system.actorFor(a.path.toString + "/") must be === a
|
||||||
|
system.actorFor(a.path.toString + "/hallo") must be === system.deadLetters
|
||||||
|
f.isCompleted must be === false
|
||||||
|
a ! 42
|
||||||
|
f.isCompleted must be === true
|
||||||
|
f.get must be === 42
|
||||||
|
system.actorFor(a.path) must be === system.deadLetters
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
"An ActorContext" must {
|
||||||
|
|
||||||
|
val all = Seq(c1, c2, c21)
|
||||||
|
|
||||||
|
"find actors by looking up their path" in {
|
||||||
|
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
|
||||||
|
(looker ? LookupPath(pathOf.path)).get must be === result
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
looker ← all
|
||||||
|
target ← all
|
||||||
|
} check(looker, target, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their string representation" in {
|
||||||
|
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
|
||||||
|
(looker ? LookupString(pathOf.path.toString)).get must be === result
|
||||||
|
(looker ? LookupString(pathOf.path.toString + "/")).get must be === result
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
looker ← all
|
||||||
|
target ← all
|
||||||
|
} check(looker, target, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their root-anchored relative path" in {
|
||||||
|
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
|
||||||
|
(looker ? LookupString(pathOf.path.pathElements.mkString("/", "/", ""))).get must be === result
|
||||||
|
(looker ? LookupString(pathOf.path.pathElements.mkString("/", "/", "/"))).get must be === result
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
looker ← all
|
||||||
|
target ← all
|
||||||
|
} check(looker, target, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
"find actors by looking up their relative path" in {
|
||||||
|
def check(looker: ActorRef, result: ActorRef, elems: String*) {
|
||||||
|
(looker ? LookupElems(elems)).get must be === result
|
||||||
|
(looker ? LookupString(elems mkString "/")).get must be === result
|
||||||
|
(looker ? LookupString(elems mkString ("", "/", "/"))).get must be === result
|
||||||
|
}
|
||||||
|
check(c1, user, "..")
|
||||||
|
for {
|
||||||
|
looker ← Seq(c1, c2)
|
||||||
|
target ← all
|
||||||
|
} check(looker, target, Seq("..") ++ target.path.pathElements.drop(1): _*)
|
||||||
|
check(c21, user, "..", "..")
|
||||||
|
check(c21, root, "..", "..", "..")
|
||||||
|
check(c21, root, "..", "..", "..", "..")
|
||||||
|
}
|
||||||
|
|
||||||
|
"find system-generated actors" in {
|
||||||
|
def check(target: ActorRef) {
|
||||||
|
for (looker ← all) {
|
||||||
|
(looker ? LookupPath(target.path)).get must be === target
|
||||||
|
(looker ? LookupString(target.path.toString)).get must be === target
|
||||||
|
(looker ? LookupString(target.path.toString + "/")).get must be === target
|
||||||
|
(looker ? LookupString(target.path.pathElements.mkString("/", "/", ""))).get must be === target
|
||||||
|
if (target != root) (looker ? LookupString(target.path.pathElements.mkString("/", "/", "/"))).get must be === target
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (target ← Seq(root, syst, user, system.deadLetters)) check(target)
|
||||||
|
}
|
||||||
|
|
||||||
|
"return deadLetters for non-existing paths" in {
|
||||||
|
def checkOne(looker: ActorRef, query: Query) {
|
||||||
|
(looker ? query).get must be === system.deadLetters
|
||||||
|
}
|
||||||
|
def check(looker: ActorRef) {
|
||||||
|
Seq(LookupString("a/b/c"),
|
||||||
|
LookupString(""),
|
||||||
|
LookupString("akka://all-systems/Nobody"),
|
||||||
|
LookupPath(system / "hallo"),
|
||||||
|
LookupElems(Seq()),
|
||||||
|
LookupElems(Seq("a"))) foreach (checkOne(looker, _))
|
||||||
|
}
|
||||||
|
for (looker ← all) check(looker)
|
||||||
|
}
|
||||||
|
|
||||||
|
"find temporary actors" in {
|
||||||
|
val f = c1 ? GetSender(testActor)
|
||||||
|
val a = expectMsgType[ActorRef]
|
||||||
|
a.path.pathElements.head must be === "temp"
|
||||||
|
(c2 ? LookupPath(a.path)).get must be === a
|
||||||
|
(c2 ? LookupString(a.path.toString)).get must be === a
|
||||||
|
(c2 ? LookupString(a.path.pathElements.mkString("/", "/", ""))).get must be === a
|
||||||
|
println("start")
|
||||||
|
(c2 ? LookupString("../../" + a.path.pathElements.mkString("/"))).get must be === a
|
||||||
|
(c2 ? LookupString(a.path.toString + "/")).get must be === a
|
||||||
|
(c2 ? LookupString(a.path.pathElements.mkString("/", "/", "") + "/")).get must be === a
|
||||||
|
(c2 ? LookupString("../../" + a.path.pathElements.mkString("/") + "/")).get must be === a
|
||||||
|
(c2 ? LookupElems(Seq("..", "..") ++ a.path.pathElements)).get must be === a
|
||||||
|
(c2 ? LookupElems(Seq("..", "..") ++ a.path.pathElements :+ "")).get must be === a
|
||||||
|
f.isCompleted must be === false
|
||||||
|
a ! 42
|
||||||
|
f.isCompleted must be === true
|
||||||
|
f.get must be === 42
|
||||||
|
(c2 ? LookupPath(a.path)).get must be === system.deadLetters
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -85,6 +85,8 @@ private[akka] class ActorCell(
|
||||||
|
|
||||||
protected final def guardian = self
|
protected final def guardian = self
|
||||||
|
|
||||||
|
protected final def lookupRoot = self
|
||||||
|
|
||||||
final def provider = system.provider
|
final def provider = system.provider
|
||||||
|
|
||||||
override def receiveTimeout: Option[Long] = if (receiveTimeoutData._1 > 0) Some(receiveTimeoutData._1) else None
|
override def receiveTimeout: Option[Long] = if (receiveTimeoutData._1 > 0) Some(receiveTimeoutData._1) else None
|
||||||
|
|
@ -97,6 +99,8 @@ private[akka] class ActorCell(
|
||||||
var receiveTimeoutData: (Long, Cancellable) =
|
var receiveTimeoutData: (Long, Cancellable) =
|
||||||
if (_receiveTimeout.isDefined) (_receiveTimeout.get, emptyCancellable) else emptyReceiveTimeoutData
|
if (_receiveTimeout.isDefined) (_receiveTimeout.get, emptyCancellable) else emptyReceiveTimeoutData
|
||||||
|
|
||||||
|
// this is accessed without further synchronization during actorFor look-ups
|
||||||
|
@volatile
|
||||||
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
|
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
|
||||||
|
|
||||||
protected def isDuplicate(name: String): Boolean = {
|
protected def isDuplicate(name: String): Boolean = {
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,18 @@ import scala.annotation.tailrec
|
||||||
object ActorPath {
|
object ActorPath {
|
||||||
// this cannot really be changed due to usage of standard URI syntax
|
// this cannot really be changed due to usage of standard URI syntax
|
||||||
final val separator = "/"
|
final val separator = "/"
|
||||||
|
final val sepLen = separator.length
|
||||||
|
|
||||||
|
def split(s: String): List[String] = {
|
||||||
|
@tailrec
|
||||||
|
def rec(pos: Int, acc: List[String]): List[String] = {
|
||||||
|
val from = s.lastIndexOf(separator, pos - 1)
|
||||||
|
val sub = s.substring(from + sepLen, pos)
|
||||||
|
val l = sub :: acc
|
||||||
|
if (from == -1) l else rec(from, l)
|
||||||
|
}
|
||||||
|
rec(s.length, Nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -70,7 +82,7 @@ final case class RootActorPath(address: Address, name: String = ActorPath.separa
|
||||||
|
|
||||||
def /(child: String): ActorPath = new ChildActorPath(this, child)
|
def /(child: String): ActorPath = new ChildActorPath(this, child)
|
||||||
|
|
||||||
def pathElements: Iterable[String] = Iterable.empty
|
val pathElements: Iterable[String] = List("")
|
||||||
|
|
||||||
override val toString = address + name
|
override val toString = address + name
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import akka.remote.RemoteAddress
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import akka.event.DeathWatch
|
import akka.event.DeathWatch
|
||||||
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ActorRef is an immutable and serializable handle to an Actor.
|
* ActorRef is an immutable and serializable handle to an Actor.
|
||||||
|
|
@ -166,6 +167,20 @@ private[akka] trait InternalActorRef extends ActorRef with ScalaActorRef {
|
||||||
def suspend(): Unit
|
def suspend(): Unit
|
||||||
def restart(cause: Throwable): Unit
|
def restart(cause: Throwable): Unit
|
||||||
def sendSystemMessage(message: SystemMessage): Unit
|
def sendSystemMessage(message: SystemMessage): Unit
|
||||||
|
def getParent: InternalActorRef
|
||||||
|
/**
|
||||||
|
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
|
||||||
|
* some provider-specific location. This method shall return the end result,
|
||||||
|
* i.e. not only the next step in the look-up; this will typically involve
|
||||||
|
* recursive invocation. A path element of ".." signifies the parent, a
|
||||||
|
* trailing "" element must be disregarded. If the requested path does not
|
||||||
|
* exist, return Nobody.
|
||||||
|
*/
|
||||||
|
def getChild(name: Iterable[String]): InternalActorRef
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] case object Nobody extends MinimalActorRef {
|
||||||
|
val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -223,6 +238,40 @@ class LocalActorRef private[akka] (
|
||||||
*/
|
*/
|
||||||
def stop(): Unit = actorCell.stop()
|
def stop(): Unit = actorCell.stop()
|
||||||
|
|
||||||
|
def getParent: InternalActorRef = actorCell.parent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method for looking up a single child beneath this actor. Override in order
|
||||||
|
* to inject “synthetic” actor paths like “/temp”.
|
||||||
|
*/
|
||||||
|
protected def getSingleChild(name: String): InternalActorRef = {
|
||||||
|
val children = actorCell.childrenRefs
|
||||||
|
if (children contains name) children(name).child.asInstanceOf[InternalActorRef]
|
||||||
|
else Nobody
|
||||||
|
}
|
||||||
|
|
||||||
|
def getChild(names: Iterable[String]): InternalActorRef = {
|
||||||
|
/*
|
||||||
|
* The idea is to recursively descend as far as possible with LocalActor
|
||||||
|
* Refs and hand over to that “foreign” child when we encounter it.
|
||||||
|
*/
|
||||||
|
@tailrec
|
||||||
|
def rec(ref: InternalActorRef, name: Iterable[String]): InternalActorRef = ref match {
|
||||||
|
case l: LocalActorRef ⇒
|
||||||
|
val n = name.head
|
||||||
|
val rest = name.tail
|
||||||
|
val next = n match {
|
||||||
|
case ".." ⇒ l.getParent
|
||||||
|
case "" ⇒ l
|
||||||
|
case _ ⇒ l.getSingleChild(n)
|
||||||
|
}
|
||||||
|
if (next == Nobody || rest.isEmpty) next else rec(next, rest)
|
||||||
|
case _ ⇒
|
||||||
|
ref.getChild(name)
|
||||||
|
}
|
||||||
|
rec(this, names)
|
||||||
|
}
|
||||||
|
|
||||||
// ========= AKKA PROTECTED FUNCTIONS =========
|
// ========= AKKA PROTECTED FUNCTIONS =========
|
||||||
|
|
||||||
protected[akka] def underlying: ActorCell = actorCell
|
protected[akka] def underlying: ActorCell = actorCell
|
||||||
|
|
@ -271,6 +320,11 @@ case class SerializedActorRef(path: String) {
|
||||||
*/
|
*/
|
||||||
trait MinimalActorRef extends InternalActorRef {
|
trait MinimalActorRef extends InternalActorRef {
|
||||||
|
|
||||||
|
def getParent: InternalActorRef = Nobody
|
||||||
|
def getChild(name: Iterable[String]): InternalActorRef =
|
||||||
|
if (name.size == 1 && name.head.isEmpty) this
|
||||||
|
else Nobody
|
||||||
|
|
||||||
//FIXME REMOVE THIS, ticket #1416
|
//FIXME REMOVE THIS, ticket #1416
|
||||||
//FIXME REMOVE THIS, ticket #1415
|
//FIXME REMOVE THIS, ticket #1415
|
||||||
def suspend(): Unit = ()
|
def suspend(): Unit = ()
|
||||||
|
|
@ -341,7 +395,13 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
|
||||||
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
|
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
|
||||||
}
|
}
|
||||||
|
|
||||||
class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deathWatch: DeathWatch, timeout: Timeout, val dispatcher: MessageDispatcher) extends MinimalActorRef {
|
class AskActorRef(
|
||||||
|
val path: ActorPath,
|
||||||
|
override val getParent: InternalActorRef,
|
||||||
|
deathWatch: DeathWatch,
|
||||||
|
timeout: Timeout,
|
||||||
|
val dispatcher: MessageDispatcher) extends MinimalActorRef {
|
||||||
|
|
||||||
final val result = new DefaultPromise[Any](timeout)(dispatcher)
|
final val result = new DefaultPromise[Any](timeout)(dispatcher)
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,13 @@ import java.io.Closeable
|
||||||
*/
|
*/
|
||||||
trait ActorRefProvider {
|
trait ActorRefProvider {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reference to the supervisor of guardian and systemGuardian; this is
|
||||||
|
* exposed so that the ActorSystemImpl can use it as lookupRoot, i.e.
|
||||||
|
* for anchoring absolute actor look-ups.
|
||||||
|
*/
|
||||||
|
def rootGuardian: InternalActorRef
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reference to the supervisor used for all top-level user actors.
|
* Reference to the supervisor used for all top-level user actors.
|
||||||
*/
|
*/
|
||||||
|
|
@ -85,17 +92,18 @@ trait ActorRefProvider {
|
||||||
/**
|
/**
|
||||||
* Create actor reference for a specified local or remote path, which will
|
* Create actor reference for a specified local or remote path, which will
|
||||||
* be parsed using java.net.URI. If no such actor exists, it will be
|
* be parsed using java.net.URI. If no such actor exists, it will be
|
||||||
* (equivalent to) a dead letter reference.
|
* (equivalent to) a dead letter reference. If `s` is a relative URI, resolve
|
||||||
|
* it relative to the given ref.
|
||||||
*/
|
*/
|
||||||
def actorFor(s: String): InternalActorRef
|
def actorFor(ref: InternalActorRef, s: String): InternalActorRef
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create actor reference for the specified child path starting at the root
|
* Create actor reference for the specified child path starting at the
|
||||||
* guardian. This method always returns an actor which is “logically local”,
|
* given starting point. This method always returns an actor which is “logically local”,
|
||||||
* i.e. it cannot be used to obtain a reference to an actor which is not
|
* i.e. it cannot be used to obtain a reference to an actor which is not
|
||||||
* physically or logically attached to this actor system.
|
* physically or logically attached to this actor system.
|
||||||
*/
|
*/
|
||||||
def actorFor(p: Iterable[String]): InternalActorRef
|
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
|
||||||
|
|
||||||
private[akka] def createDeathWatch(): DeathWatch
|
private[akka] def createDeathWatch(): DeathWatch
|
||||||
|
|
||||||
|
|
@ -127,6 +135,8 @@ trait ActorRefFactory {
|
||||||
*/
|
*/
|
||||||
protected def guardian: InternalActorRef
|
protected def guardian: InternalActorRef
|
||||||
|
|
||||||
|
protected def lookupRoot: InternalActorRef
|
||||||
|
|
||||||
protected def randomName(): String
|
protected def randomName(): String
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -171,9 +181,12 @@ trait ActorRefFactory {
|
||||||
|
|
||||||
def actorFor(path: ActorPath): ActorRef = provider.actorFor(path)
|
def actorFor(path: ActorPath): ActorRef = provider.actorFor(path)
|
||||||
|
|
||||||
def actorFor(path: String): ActorRef = provider.actorFor(path)
|
def actorFor(path: String): ActorRef = provider.actorFor(lookupRoot, path)
|
||||||
|
|
||||||
def actorFor(path: Iterable[String]): ActorRef = provider.actorFor(path)
|
/**
|
||||||
|
* For maximum performance use a collection with efficient head & tail operations.
|
||||||
|
*/
|
||||||
|
def actorFor(path: Iterable[String]): ActorRef = provider.actorFor(lookupRoot, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
class ActorRefProviderException(message: String) extends AkkaException(message)
|
class ActorRefProviderException(message: String) extends AkkaException(message)
|
||||||
|
|
@ -203,11 +216,11 @@ class LocalActorRefProvider(
|
||||||
*/
|
*/
|
||||||
private val tempNumber = new AtomicLong
|
private val tempNumber = new AtomicLong
|
||||||
|
|
||||||
private def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
|
private def tempName() = Helpers.base64(tempNumber.getAndIncrement())
|
||||||
|
|
||||||
private val tempNode = rootPath / "temp"
|
private val tempNode = rootPath / "temp"
|
||||||
|
|
||||||
def tempPath = tempNode / tempName
|
def tempPath() = tempNode / tempName
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Top-level anchor for the supervision hierarchy of this actor system. Will
|
* Top-level anchor for the supervision hierarchy of this actor system. Will
|
||||||
|
|
@ -281,10 +294,36 @@ class LocalActorRefProvider(
|
||||||
def dispatcher: MessageDispatcher = system.dispatcher
|
def dispatcher: MessageDispatcher = system.dispatcher
|
||||||
|
|
||||||
lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher)
|
lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher)
|
||||||
lazy val rootGuardian: InternalActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
|
|
||||||
|
lazy val rootGuardian: InternalActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) {
|
||||||
|
override def getParent: InternalActorRef = this
|
||||||
|
override def getSingleChild(name: String): InternalActorRef = {
|
||||||
|
name match {
|
||||||
|
case "temp" ⇒ tempContainer
|
||||||
|
case _ ⇒ super.getSingleChild(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lazy val guardian: InternalActorRef = actorOf(system, guardianProps, rootGuardian, "user", true)
|
lazy val guardian: InternalActorRef = actorOf(system, guardianProps, rootGuardian, "user", true)
|
||||||
|
|
||||||
lazy val systemGuardian: InternalActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "system", true)
|
lazy val systemGuardian: InternalActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "system", true)
|
||||||
|
|
||||||
|
lazy val tempContainer = new MinimalActorRef {
|
||||||
|
val children = new ConcurrentHashMap[String, AskActorRef]
|
||||||
|
def path = tempNode
|
||||||
|
override def getParent = rootGuardian
|
||||||
|
override def getChild(name: Iterable[String]): InternalActorRef = {
|
||||||
|
val c = children.get(name.head)
|
||||||
|
if (c == null) Nobody
|
||||||
|
else {
|
||||||
|
val t = name.tail
|
||||||
|
if (t.isEmpty) c
|
||||||
|
else c.getChild(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
val deathWatch = createDeathWatch()
|
val deathWatch = createDeathWatch()
|
||||||
|
|
||||||
def init(_system: ActorSystemImpl) {
|
def init(_system: ActorSystemImpl) {
|
||||||
|
|
@ -294,25 +333,25 @@ class LocalActorRefProvider(
|
||||||
deathWatch.subscribe(rootGuardian, systemGuardian)
|
deathWatch.subscribe(rootGuardian, systemGuardian)
|
||||||
}
|
}
|
||||||
|
|
||||||
def actorFor(path: String): InternalActorRef = path match {
|
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
||||||
case LocalActorPath(address, elems) if address == rootPath.address ⇒
|
case RelativeActorPath(elems) ⇒
|
||||||
findInTree(rootGuardian.asInstanceOf[LocalActorRef], elems)
|
if (elems.isEmpty) deadLetters
|
||||||
|
else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail)
|
||||||
|
else actorFor(ref, elems)
|
||||||
|
case LocalActorPath(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems)
|
||||||
case _ ⇒ deadLetters
|
case _ ⇒ deadLetters
|
||||||
}
|
}
|
||||||
|
|
||||||
def actorFor(path: ActorPath): InternalActorRef = findInTree(rootGuardian.asInstanceOf[LocalActorRef], path.pathElements)
|
def actorFor(path: ActorPath): InternalActorRef =
|
||||||
|
if (path.root == rootPath) actorFor(rootGuardian, path.pathElements)
|
||||||
|
else deadLetters
|
||||||
|
|
||||||
def actorFor(path: Iterable[String]): InternalActorRef = findInTree(rootGuardian.asInstanceOf[LocalActorRef], path)
|
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef =
|
||||||
|
if (path.isEmpty) deadLetters
|
||||||
@tailrec
|
else ref.getChild(path) match {
|
||||||
private def findInTree(start: LocalActorRef, path: Iterable[String]): InternalActorRef = {
|
case Nobody ⇒ deadLetters
|
||||||
if (path.isEmpty) start
|
case x ⇒ x
|
||||||
else start.underlying.getChild(path.head) match {
|
|
||||||
case null ⇒ deadLetters
|
|
||||||
case child: LocalActorRef ⇒ findInTree(child, path.tail)
|
|
||||||
case _ ⇒ deadLetters
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, name: String, systemService: Boolean): InternalActorRef = {
|
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, name: String, systemService: Boolean): InternalActorRef = {
|
||||||
val path = supervisor.path / name
|
val path = supervisor.path / name
|
||||||
|
|
@ -377,7 +416,14 @@ class LocalActorRefProvider(
|
||||||
case t if t.duration.length <= 0 ⇒
|
case t if t.duration.length <= 0 ⇒
|
||||||
new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout
|
new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout
|
||||||
case t ⇒
|
case t ⇒
|
||||||
val a = new AskActorRef(tempPath, this, deathWatch, t, dispatcher)
|
val path = tempPath()
|
||||||
|
val name = path.name
|
||||||
|
val a = new AskActorRef(path, tempContainer, deathWatch, t, dispatcher) {
|
||||||
|
override def whenDone() {
|
||||||
|
tempContainer.children.remove(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tempContainer.children.put(name, a)
|
||||||
recipient.tell(message, a)
|
recipient.tell(message, a)
|
||||||
a.result
|
a.result
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -201,6 +201,11 @@ abstract class ActorSystem extends ActorRefFactory {
|
||||||
*/
|
*/
|
||||||
def /(name: String): ActorPath
|
def /(name: String): ActorPath
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a path below the application guardian to be used with [[ActorSystem.actorFor]].
|
||||||
|
*/
|
||||||
|
def /(name: Iterable[String]): ActorPath
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start-up time in milliseconds since the epoch.
|
* Start-up time in milliseconds since the epoch.
|
||||||
*/
|
*/
|
||||||
|
|
@ -382,6 +387,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
||||||
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
|
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
|
||||||
|
|
||||||
def terminationFuture: Future[Unit] = provider.terminationFuture
|
def terminationFuture: Future[Unit] = provider.terminationFuture
|
||||||
|
def lookupRoot: InternalActorRef = provider.rootGuardian
|
||||||
def guardian: InternalActorRef = provider.guardian
|
def guardian: InternalActorRef = provider.guardian
|
||||||
def systemGuardian: InternalActorRef = provider.systemGuardian
|
def systemGuardian: InternalActorRef = provider.systemGuardian
|
||||||
def deathWatch: DeathWatch = provider.deathWatch
|
def deathWatch: DeathWatch = provider.deathWatch
|
||||||
|
|
@ -392,6 +398,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
||||||
override protected def randomName(): String = Helpers.base64(nextName.incrementAndGet())
|
override protected def randomName(): String = Helpers.base64(nextName.incrementAndGet())
|
||||||
|
|
||||||
def /(actorName: String): ActorPath = guardian.path / actorName
|
def /(actorName: String): ActorPath = guardian.path / actorName
|
||||||
|
def /(path: Iterable[String]): ActorPath = guardian.path / path
|
||||||
|
|
||||||
private lazy val _start: this.type = {
|
private lazy val _start: this.type = {
|
||||||
provider.init(this)
|
provider.init(this)
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,16 @@ case class LocalAddress(systemName: String) extends Address {
|
||||||
def hostPort = systemName
|
def hostPort = systemName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object RelativeActorPath {
|
||||||
|
def unapply(addr: String): Option[Iterable[String]] = {
|
||||||
|
try {
|
||||||
|
val uri = new URI(addr)
|
||||||
|
if (uri.isAbsolute) None
|
||||||
|
else Some(ActorPath.split(uri.getPath))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
object LocalActorPath {
|
object LocalActorPath {
|
||||||
def unapply(addr: String): Option[(LocalAddress, Iterable[String])] = {
|
def unapply(addr: String): Option[(LocalAddress, Iterable[String])] = {
|
||||||
try {
|
try {
|
||||||
|
|
@ -30,7 +40,7 @@ object LocalActorPath {
|
||||||
if (uri.getUserInfo != null) return None
|
if (uri.getUserInfo != null) return None
|
||||||
if (uri.getHost == null) return None
|
if (uri.getHost == null) return None
|
||||||
if (uri.getPath == null) return None
|
if (uri.getPath == null) return None
|
||||||
Some(LocalAddress(uri.getHost), uri.getPath.split("/").drop(1))
|
Some(LocalAddress(uri.getHost), ActorPath.split(uri.getPath).drop(1))
|
||||||
} catch {
|
} catch {
|
||||||
case _: URISyntaxException ⇒ None
|
case _: URISyntaxException ⇒ None
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ object RemoteActorPath {
|
||||||
if (uri.getHost == null) return None
|
if (uri.getHost == null) return None
|
||||||
if (uri.getPort == -1) return None
|
if (uri.getPort == -1) return None
|
||||||
if (uri.getPath == null) return None
|
if (uri.getPath == null) return None
|
||||||
Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort), uri.getPath.split("/").drop(1))
|
Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort), ActorPath.split(uri.getPath).drop(1))
|
||||||
} catch {
|
} catch {
|
||||||
case _: URISyntaxException ⇒ None
|
case _: URISyntaxException ⇒ None
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -141,7 +141,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||||
message.getActorPath match {
|
message.getActorPath match {
|
||||||
case RemoteActorPath(addr, elems) if addr == remoteAddress && elems.size > 0 ⇒
|
case RemoteActorPath(addr, elems) if addr == remoteAddress && elems.size > 0 ⇒
|
||||||
val name = elems.last
|
val name = elems.last
|
||||||
systemImpl.provider.actorFor(elems.dropRight(1)) match {
|
systemImpl.provider.actorFor(systemImpl.lookupRoot, elems.dropRight(1)) match {
|
||||||
case x if x eq system.deadLetters ⇒
|
case x if x eq system.deadLetters ⇒
|
||||||
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
|
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
|
||||||
case parent ⇒
|
case parent ⇒
|
||||||
|
|
@ -246,7 +246,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
|
||||||
val provider = remote.system.asInstanceOf[ActorSystemImpl].provider
|
val provider = remote.system.asInstanceOf[ActorSystemImpl].provider
|
||||||
|
|
||||||
lazy val sender: ActorRef =
|
lazy val sender: ActorRef =
|
||||||
if (input.hasSender) provider.actorFor(input.getSender.getPath)
|
if (input.hasSender) provider.actorFor(provider.rootGuardian, input.getSender.getPath)
|
||||||
else remote.system.deadLetters
|
else remote.system.deadLetters
|
||||||
|
|
||||||
lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath)
|
lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath)
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ class RemoteActorRefProvider(
|
||||||
val log = Logging(eventStream, "RemoteActorRefProvider")
|
val log = Logging(eventStream, "RemoteActorRefProvider")
|
||||||
|
|
||||||
def deathWatch = local.deathWatch
|
def deathWatch = local.deathWatch
|
||||||
|
def rootGuardian = local.rootGuardian
|
||||||
def guardian = local.guardian
|
def guardian = local.guardian
|
||||||
def systemGuardian = local.systemGuardian
|
def systemGuardian = local.systemGuardian
|
||||||
def nodename = remoteExtension.NodeName
|
def nodename = remoteExtension.NodeName
|
||||||
|
|
@ -181,8 +182,8 @@ class RemoteActorRefProvider(
|
||||||
}
|
}
|
||||||
|
|
||||||
def actorFor(path: ActorPath): InternalActorRef = local.actorFor(path)
|
def actorFor(path: ActorPath): InternalActorRef = local.actorFor(path)
|
||||||
def actorFor(path: String): InternalActorRef = local.actorFor(path)
|
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = local.actorFor(ref, path)
|
||||||
def actorFor(path: Iterable[String]): InternalActorRef = local.actorFor(path)
|
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
|
||||||
|
|
||||||
// TODO remove me
|
// TODO remove me
|
||||||
val optimizeLocal = new AtomicBoolean(true)
|
val optimizeLocal = new AtomicBoolean(true)
|
||||||
|
|
@ -267,13 +268,13 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
loader: Option[ClassLoader])
|
loader: Option[ClassLoader])
|
||||||
extends InternalActorRef {
|
extends InternalActorRef {
|
||||||
|
|
||||||
|
// FIXME
|
||||||
|
def getParent = Nobody
|
||||||
|
def getChild(name: Iterable[String]) = Nobody
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
private var running: Boolean = true
|
private var running: Boolean = true
|
||||||
|
|
||||||
def name = path.name
|
|
||||||
|
|
||||||
def address = remoteAddress + path.toString
|
|
||||||
|
|
||||||
def isTerminated: Boolean = !running
|
def isTerminated: Boolean = !running
|
||||||
|
|
||||||
def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
|
def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue