diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index b485aa0931..9edf60b57f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -278,32 +278,19 @@ class ActorRefSpec extends AkkaSpec { " Use akka.serialization.Serialization.app.withValue(akkaApplication) { ... }" } - "must throw exception on deserialize if not present in local registry and remoting is not enabled" in { - val latch = new CountDownLatch(1) - val a = actorOf(new InnerActor { - override def postStop { - // app.registry.unregister(self) - latch.countDown - } - }) - - val inetAddress = app.defaultAddress - - val expectedSerializedRepresentation = new SerializedActorRef(a.address, inetAddress) - + "must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { import java.io._ val baos = new ByteArrayOutputStream(8192 * 32) val out = new ObjectOutputStream(baos) - out.writeObject(a) + val serialized = SerializedActorRef(app.hostname, app.port, "/this/path/does/not/exist") + + out.writeObject(serialized) out.flush out.close - a.stop() - latch.await(5, TimeUnit.SECONDS) must be === true - Serialization.app.withValue(app) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) (intercept[java.lang.IllegalStateException] { diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index d6693dbeca..b3f6624283 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -13,12 +13,12 @@ class DeployerSpec extends AkkaSpec { "A Deployer" must { "be able to parse 'akka.actor.deployment._' config elements" in { - val deployment = app.provider.deployer.lookupInConfig("service-ping") + val deployment = app.provider.deployer.lookupInConfig("/app/service-ping") deployment must be('defined) deployment must equal(Some( Deploy( - "service-ping", + "/app/service-ping", None, RoundRobin, NrOfInstances(3), diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 05716ea04b..5bf3fcf9d7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -217,13 +217,13 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true app.mainbus.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[" + app.address + "/sys/testActor") ⇒ true } expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) fsm ! "stop" expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") ⇒ true + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[" + app.address + "/sys/testActor") ⇒ true } expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) expectNoMsg(1 second) diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 891b09ce5b..a4115fce2b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -139,11 +139,15 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd app.mainbus.subscribe(testActor, classOf[Logging.Debug]) app.mainbus.subscribe(testActor, classOf[Logging.Error]) within(3 seconds) { + val lifecycleGuardian = appLifecycle.guardian val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) - expectMsgPF() { - case Logging.Debug(`supervisor`, msg: String) if msg startsWith "started" ⇒ - } + val supervisorSet = receiveWhile(messages = 2) { + case Logging.Debug(`lifecycleGuardian`, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`supervisor`, msg: String) if msg startsWith "started" ⇒ 2 + }.toSet + expectNoMsg(Duration.Zero) + assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") val actor = new TestActorRef[TestLogActor](app, Props[TestLogActor], supervisor, "none") diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index a441ca8651..153bdec586 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -161,18 +161,14 @@ object ActorModelSpec { } } - def assertCountDown(latch: CountDownLatch, wait: Long, hint: AnyRef) { - try { - assert(latch.await(wait, TimeUnit.MILLISECONDS) === true) - } catch { - case e ⇒ - System.err.println("assertCountDown failed was: " + latch.getCount) - throw e - } + def assertCountDown(latch: CountDownLatch, wait: Long, hint: String) { + if (!latch.await(wait, TimeUnit.MILLISECONDS)) + fail("Failed to count down within " + wait + " millis (count at " + latch.getCount + "). " + hint) } - def assertNoCountDown(latch: CountDownLatch, wait: Long, hint: AnyRef) { - assert(latch.await(wait, TimeUnit.MILLISECONDS) === false) + def assertNoCountDown(latch: CountDownLatch, wait: Long, hint: String) { + if (latch.await(wait, TimeUnit.MILLISECONDS)) + fail("Expected count down to fail after " + wait + " millis. " + hint) } def statsFor(actorRef: ActorRef, dispatcher: MessageDispatcher = null) = @@ -354,18 +350,21 @@ abstract class ActorModelSpec extends AkkaSpec { def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) + val stopLatch = new CountDownLatch(num) + val waitTime = (30 seconds).dilated.toMillis val boss = actorOf(Props(context ⇒ { case "run" ⇒ - for (_ ← 1 to num) context.actorOf(props) ! cachedMessage + for (_ ← 1 to num) { + val child = context.actorOf(props) + context.self startsMonitoring child + child ! cachedMessage + } + case Terminated(child) ⇒ + stopLatch.countDown() }).withDispatcher(wavesSupervisorDispatcher(dispatcher))) boss ! "run" - try { - assertCountDown(cachedMessage.latch, (20 seconds).dilated.toMillis, "Should process " + num + " countdowns") - } catch { - case e ⇒ - System.err.println(this.getClass.getName + " error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) - throw e - } + assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) + assertCountDown(stopLatch, waitTime, "Expected all children to stop") boss.stop() } for (run ← 1 to 3) { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index adf251c766..424e897fa4 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -14,11 +14,11 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { "round robin router" must { "be able to shut down its instance" in { - val address = "round-robin-0" + val path = app / "round-robin-0" app.provider.deployer.deploy( Deploy( - address, + path.toString, None, RoundRobin, NrOfInstances(5), @@ -35,7 +35,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { override def postStop() { stopLatch.countDown() } - }), address) + }), path.name) actor ! "hello" actor ! "hello" @@ -49,11 +49,11 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { } "deliver messages in a round robin fashion" in { - val address = "round-robin-1" + val path = app / "round-robin-1" app.provider.deployer.deploy( Deploy( - address, + path.toString, None, RoundRobin, NrOfInstances(10), @@ -75,7 +75,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { case "hit" ⇒ sender ! id case "end" ⇒ doneLatch.countDown() } - }), address) + }), path.name) for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -93,11 +93,11 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { } "deliver a broadcast message using the !" in { - val address = "round-robin-2" + val path = app / "round-robin-2" app.provider.deployer.deploy( Deploy( - address, + path.toString, None, RoundRobin, NrOfInstances(5), @@ -114,7 +114,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { override def postStop() { stopLatch.countDown() } - }), address) + }), path.name) actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -127,11 +127,11 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { "random router" must { "be able to shut down its instance" in { - val address = "random-0" + val path = app / "random-0" app.provider.deployer.deploy( Deploy( - address, + path.toString, None, Random, NrOfInstances(7), @@ -147,7 +147,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { override def postStop() { stopLatch.countDown() } - }), address) + }), path.name) actor ! "hello" actor ! "hello" @@ -160,11 +160,11 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { } "deliver messages in a random fashion" in { - val address = "random-1" + val path = app / "random-1" app.provider.deployer.deploy( Deploy( - address, + path.toString, None, Random, NrOfInstances(10), @@ -186,7 +186,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { case "hit" ⇒ sender ! id case "end" ⇒ doneLatch.countDown() } - }), address) + }), path.name) for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -204,11 +204,11 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { } "deliver a broadcast message using the !" in { - val address = "random-2" + val path = app / "random-2" app.provider.deployer.deploy( Deploy( - address, + path.toString, None, Random, NrOfInstances(6), @@ -225,7 +225,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec { override def postStop() { stopLatch.countDown() } - }), address) + }), path.name) actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 7ec88972fe..8eee358ee9 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -167,6 +167,8 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor def port: Int = defaultAddress.getPort + def address: String = hostname + ":" + port.toString + // this provides basic logging (to stdout) until .start() is called below val mainbus = new MainBus(DebugMainBus) mainbus.startStdoutLogger(AkkaConfig) @@ -180,6 +182,11 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO think about memory consistency effects when doing funky stuff inside constructor val reflective = new ReflectiveAccess(this) + /** + * The root actor path for this application. + */ + val root: ActorPath = new RootActorPath(this) + // TODO think about memory consistency effects when doing funky stuff inside constructor val provider: ActorRefProvider = reflective.createProvider @@ -205,14 +212,14 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor } private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy) - private val guardianInChief: ActorRef = - provider.actorOf(guardianProps, provider.theOneWhoWalksTheBubblesOfSpaceTime, "GuardianInChief", true) + private val rootGuardian: ActorRef = + provider.actorOf(guardianProps, provider.theOneWhoWalksTheBubblesOfSpaceTime, root, true) protected[akka] val guardian: ActorRef = - provider.actorOf(guardianProps, guardianInChief, "ApplicationSupervisor", true) + provider.actorOf(guardianProps, rootGuardian, "app", true) protected[akka] val systemGuardian: ActorRef = - provider.actorOf(guardianProps.withCreator(new SystemGuardian), guardianInChief, "SystemSupervisor", true) + provider.actorOf(guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true) // TODO think about memory consistency effects when doing funky stuff inside constructor val deadLetters = new DeadLetterActorRef(this) @@ -221,7 +228,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // chain death watchers so that killing guardian stops the application deathWatch.subscribe(systemGuardian, guardian) - deathWatch.subscribe(guardianInChief, systemGuardian) + deathWatch.subscribe(rootGuardian, systemGuardian) // this starts the reaper actor and the user-configured logging subscribers, which are also actors mainbus.start(this) @@ -239,6 +246,11 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val scheduler = new DefaultScheduler terminationFuture.onComplete(_ ⇒ scheduler.shutdown()) + /** + * Create an actor path under the application supervisor (/app). + */ + def /(actorName: String): ActorPath = guardian.path / actorName + // TODO shutdown all that other stuff, whatever that may be def stop(): Unit = { guardian.stop() diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index c7f93b493f..6029a700f0 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -213,7 +213,7 @@ trait Actor { * Stores the context for this actor, including self, sender, and hotswap. */ @transient - private[akka] implicit val context: ActorContext = { + protected[akka] implicit val context: ActorContext = { val contextStack = ActorCell.contextStack.get def noContextError = @@ -441,23 +441,3 @@ trait Actor { private val processingBehavior = receive //ProcessingBehavior is the original behavior } -/** - * Helper methods and fields for working with actor addresses. - * Meant for internal use. - * - * @author Jonas Bonér - */ -object Address { - - val clusterActorRefPrefix = "cluster-actor-ref.".intern - - private val validAddressPattern = java.util.regex.Pattern.compile("[0-9a-zA-Z\\-\\_\\$\\.]+") - - def validate(address: String) { - if (!validAddressPattern.matcher(address).matches) { - val e = new IllegalArgumentException("Address [" + address + "] is not valid, need to follow pattern: " + validAddressPattern.pattern) - throw e - } - } -} - diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 16f1ec7535..f16a9867c8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -18,7 +18,7 @@ import akka.event.Logging.{ Debug, Warning, Error } * Exposes contextual information for the actor and the current message. * TODO: everything here for current compatibility - could be limited more */ -private[akka] trait ActorContext extends ActorRefFactory with TypedActorFactory { +trait ActorContext extends ActorRefFactory with TypedActorFactory { def self: ActorRef with ScalaActorRef @@ -56,7 +56,9 @@ private[akka] object ActorCell { override def initialValue = Stack[ActorContext]() } - val emptyChildren = TreeMap[ActorRef, ChildRestartStats]() + val emptyChildrenRefs = TreeMap[String, ActorRef]() + + val emptyChildrenStats = TreeMap[ActorRef, ChildRestartStats]() } //vars don't need volatile since it's protected with the mailbox status @@ -79,7 +81,9 @@ private[akka] class ActorCell( var futureTimeout: Option[ScheduledFuture[AnyRef]] = None - var _children = emptyChildren //Reuse same empty instance to avoid allocating new instance of the Ordering and the actual empty instance for every actor + var childrenRefs = emptyChildrenRefs + + var childrenStats = emptyChildrenStats var currentMessage: Envelope = null @@ -125,7 +129,13 @@ private[akka] class ActorCell( subject } - final def children: Iterable[ActorRef] = _children.keys + final def children: Iterable[ActorRef] = childrenStats.keys + + final def getChild(name: String): Option[ActorRef] = { + val isClosed = mailbox.isClosed // fence plus volatile read + if (isClosed) None + else childrenRefs.get(name) + } final def postMessageToMailbox(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, sender)) @@ -210,7 +220,7 @@ private[akka] class ActorCell( def terminate() { receiveTimeout = None cancelReceiveTimeout - app.provider.evict(self.address) + app.provider.evict(self.path.toString) dispatcher.detach(this) try { @@ -222,7 +232,8 @@ private[akka] class ActorCell( //Stop supervised actors val c = children if (c.nonEmpty) { - _children = TreeMap.empty + childrenRefs = emptyChildrenRefs + childrenStats = emptyChildrenStats for (child ← c) child.stop() } } @@ -238,9 +249,10 @@ private[akka] class ActorCell( } def supervise(child: ActorRef): Unit = { - val links = _children - if (!links.contains(child)) { - _children = _children.updated(child, ChildRestartStats()) + val stats = childrenStats + if (!stats.contains(child)) { + childrenRefs = childrenRefs.updated(child.name, child) + childrenStats = childrenStats.updated(child, ChildRestartStats()) if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now supervising " + child)) } else app.mainbus.publish(Warning(self, "Already supervising " + child)) } @@ -311,13 +323,14 @@ private[akka] class ActorCell( } } - final def handleFailure(fail: Failed): Unit = _children.get(fail.actor) match { - case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, _children)) throw fail.cause + final def handleFailure(fail: Failed): Unit = childrenStats.get(fail.actor) match { + case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, childrenStats)) throw fail.cause case None ⇒ app.mainbus.publish(Warning(self, "dropping " + fail + " from unknown child")) } final def handleChildTerminated(child: ActorRef): Unit = { - _children -= child + childrenRefs -= child.name + childrenStats -= child props.faultHandler.handleChildTerminated(child, children) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala new file mode 100644 index 0000000000..460b71a28d --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.actor + +import akka.AkkaApplication + +object ActorPath { + final val separator = "/" + + val pattern = """(/[0-9a-zA-Z\-\_\$\.]+)+""".r.pattern + + /** + * Create an actor path from a string. + */ + def apply(app: AkkaApplication, path: String): ActorPath = + apply(app, split(path)) + + /** + * Create an actor path from an iterable. + */ + def apply(app: AkkaApplication, path: Iterable[String]): ActorPath = + path.foldLeft(app.root)(_ / _) + + /** + * Split a string path into an iterable. + */ + def split(path: String): Iterable[String] = + if (path.startsWith(separator)) + path.substring(1).split(separator) + else + path.split(separator) + + /** + * Join an iterable path into a string. + */ + def join(path: Iterable[String]): String = + path.mkString(separator, separator, "") + + /** + * Is this string representation of a path valid? + */ + def valid(path: String): Boolean = + pattern.matcher(path).matches + + /** + * Validate a path. Moved here from Address.validate. + * Throws an IllegalArgumentException if the path is invalid. + */ + def validate(path: String): Unit = { + if (!valid(path)) + throw new IllegalArgumentException("Path [" + path + "] is not valid. Needs to follow this pattern: " + pattern) + } +} + +/** + * Actor path is a unique path to an actor that shows the creation path + * up through the actor tree to the root actor. + */ +trait ActorPath { + /** + * The akka application for this path. + */ + def app: AkkaApplication + + /** + * The name of the actor that this path refers to. + */ + def name: String + + /** + * The path for the parent actor. + */ + def parent: ActorPath + + /** + * Create a new child actor path. + */ + def /(child: String): ActorPath + + /** + * Find the ActorRef for this path. + */ + def ref: Option[ActorRef] + + /** + * String representation of this path. Different from toString for root path. + */ + def string: String + + /** + * Sequence of names for this path. + */ + def path: Iterable[String] + + /** + * Is this the root path? + */ + def isRoot: Boolean +} + +class RootActorPath(val app: AkkaApplication) extends ActorPath { + + def name: String = "/" + + def parent: ActorPath = this + + def /(child: String): ActorPath = new ChildActorPath(app, this, child) + + def ref: Option[ActorRef] = app.actorFor(path) + + def string: String = "" + + def path: Iterable[String] = Iterable.empty + + def isRoot: Boolean = true + + override def toString = ActorPath.separator +} + +class ChildActorPath(val app: AkkaApplication, val parent: ActorPath, val name: String) extends ActorPath { + + def /(child: String): ActorPath = new ChildActorPath(app, this, child) + + def ref: Option[ActorRef] = app.actorFor(path) + + def string: String = parent.string + ActorPath.separator + name + + def path: Iterable[String] = parent.path ++ Iterable(name) + + def isRoot: Boolean = false + + override def toString = string +} + diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6a3392f910..ab1ad1fea6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -49,7 +49,17 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable // Only mutable for RemoteServer in order to maintain identity across nodes /** - * Returns the address for the actor. + * Returns the name for this actor. Locally unique (across siblings). + */ + def name: String + + /** + * Returns the path for this actor (from this actor up to the root actor). + */ + def path: ActorPath + + /** + * Returns the absolute address for this actor in the form hostname:port/path/to/actor. */ def address: String @@ -154,17 +164,15 @@ class LocalActorRef private[akka] ( _app: AkkaApplication, props: Props, _supervisor: ActorRef, - _givenAddress: String, + val path: ActorPath, val systemService: Boolean = false, - private[akka] val uuid: Uuid = newUuid, receiveTimeout: Option[Long] = None, hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) extends ActorRef with ScalaActorRef { - final val address: String = _givenAddress match { - case null | Props.randomAddress ⇒ uuid.toString - case other ⇒ other - } + def name = path.name + + def address: String = _app.address + path.toString private[this] val actorCell = new ActorCell(_app, this, props, _supervisor, receiveTimeout, hotswap) actorCell.start() @@ -283,10 +291,10 @@ trait ScalaActorRef { ref: ActorRef ⇒ * Memento pattern for serializing ActorRefs transparently */ -case class SerializedActorRef(address: String, hostname: String, port: Int) { +case class SerializedActorRef(hostname: String, port: Int, path: String) { import akka.serialization.Serialization.app - def this(address: String, inet: InetSocketAddress) = this(address, inet.getAddress.getHostAddress, inet.getPort) + def this(inet: InetSocketAddress, path: String) = this(inet.getAddress.getHostAddress, inet.getPort, path) @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = { @@ -331,7 +339,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { trait MinimalActorRef extends ActorRef with ScalaActorRef { private[akka] val uuid: Uuid = newUuid() - def address = uuid.toString + def name: String = uuid.toString def startsMonitoring(actorRef: ActorRef): ActorRef = actorRef def stopsMonitoring(actorRef: ActorRef): ActorRef = actorRef @@ -365,7 +373,13 @@ object DeadLetterActorRef { class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef { val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher) - override val address: String = "akka:internal:DeadLetterActorRef" + + override val name: String = "dead-letter" + + // FIXME (actor path): put this under the sys guardian supervisor + val path: ActorPath = app.root / "sys" / name + + def address: String = app.address + path.toString override def isShutdown(): Boolean = true @@ -384,6 +398,11 @@ class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef { abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout = app.AkkaConfig.ActorTimeout, dispatcher: MessageDispatcher = app.dispatcher) extends MinimalActorRef { final val result = new DefaultPromise[Any](timeout)(dispatcher) + // FIXME (actor path): put this under the tmp guardian supervisor + val path: ActorPath = app.root / "tmp" / name + + def address: String = app.address + path.toString + { val callback: Future[Any] ⇒ Unit = { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this)); whenDone() } result onComplete callback diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index c7c23ef76c..3c1f185a69 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -13,26 +13,29 @@ import com.eaio.uuid.UUID import akka.AkkaException import akka.event.{ ActorClassification, DeathWatch, Logging } import akka.dispatch._ +import scala.annotation.tailrec /** * Interface for all ActorRef providers to implement. */ trait ActorRefProvider { - def actorOf(props: Props, supervisor: ActorRef, address: String): ActorRef = actorOf(props, supervisor, address, false) + def actorOf(props: Props, supervisor: ActorRef, name: String): ActorRef = actorOf(props, supervisor, name, false) - def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef + def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef - def actorFor(address: String): Option[ActorRef] + def actorFor(path: Iterable[String]): Option[ActorRef] /** * What deployer will be used to resolve deployment configuration? */ private[akka] def deployer: Deployer - private[akka] def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef + private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef - private[akka] def evict(address: String): Boolean + private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef + + private[akka] def evict(path: String): Boolean private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] @@ -62,19 +65,19 @@ trait ActorRefFactory { */ protected def guardian: ActorRef - def actorOf(props: Props): ActorRef = actorOf(props, Props.randomAddress) + def actorOf(props: Props): ActorRef = actorOf(props, Props.randomName) /* * TODO this will have to go at some point, because creating two actors with * the same address can race on the cluster, and then you never know which * implementation wins */ - def actorOf(props: Props, address: String): ActorRef = provider.actorOf(props, guardian, address, false) + def actorOf(props: Props, name: String): ActorRef = provider.actorOf(props, guardian, name, false) def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]])) - def actorOf[T <: Actor](address: String)(implicit m: Manifest[T]): ActorRef = - actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]), address) + def actorOf[T <: Actor](name: String)(implicit m: Manifest[T]): ActorRef = + actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]), name) def actorOf[T <: Actor](clazz: Class[T]): ActorRef = actorOf(Props(clazz)) @@ -82,12 +85,13 @@ trait ActorRefFactory { def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() ⇒ creator.create())) - def actorOf(props: RoutedProps): ActorRef = actorOf(props, Props.randomAddress) + def actorOf(props: RoutedProps): ActorRef = actorOf(props, Props.randomName) - def actorOf(props: RoutedProps, address: String): ActorRef = provider.actorOf(props, guardian, address) + def actorOf(props: RoutedProps, name: String): ActorRef = provider.actorOf(props, guardian, name) - def actorFor(address: String): Option[ActorRef] = provider.actorFor(address) + def actorFor(path: String): Option[ActorRef] = actorFor(ActorPath.split(path)) + def actorFor(path: Iterable[String]): Option[ActorRef] = provider.actorFor(path) } class ActorRefProviderException(message: String) extends AkkaException(message) @@ -110,9 +114,14 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { @volatile var stopped = false - override def address = app.name + ":BubbleWalker" + val name = app.name + "-bubble-walker" - override def toString = address + // FIXME (actor path): move the root path to the new root guardian + val path = app.root + + val address = app.address + path.toString + + override def toString = name def stop() = stopped = true @@ -134,9 +143,26 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { } } + // FIXME (actor path): this could become a cache for the new tree traversal actorFor + // currently still used for tmp actors (e.g. ask actor refs) private val actors = new ConcurrentHashMap[String, AnyRef] - def actorFor(address: String): Option[ActorRef] = actors.get(address) match { + // FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "app" name for now) + def actorFor(path: Iterable[String]): Option[ActorRef] = findInCache(ActorPath.join(path)) orElse findInTree(Some(app.guardian), path.tail) + + @tailrec + private def findInTree(start: Option[ActorRef], path: Iterable[String]): Option[ActorRef] = { + if (path.isEmpty) start + else { + val child = start match { + case Some(local: LocalActorRef) ⇒ local.underlying.getChild(path.head) + case _ ⇒ None + } + findInTree(child, path.tail) + } + } + + private def findInCache(path: String): Option[ActorRef] = actors.get(path) match { case null ⇒ None case actor: ActorRef ⇒ Some(actor) case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) @@ -145,26 +171,32 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { /** * Returns true if the actor was in the provider's cache and evicted successfully, else false. */ - private[akka] def evict(address: String): Boolean = actors.remove(address) ne null + private[akka] def evict(path: String): Boolean = actors.remove(path) ne null - private[akka] def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef = { - if ((address eq null) || address == Props.randomAddress) { - val actor = new LocalActorRef(app, props, supervisor, address, systemService = true) - actors.putIfAbsent(actor.address, actor) match { + private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = + actorOf(props, supervisor, supervisor.path / name, systemService) + + private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = { + val name = path.name + if ((name eq null) || name == Props.randomName) { + val randomName: String = newUuid.toString + val newPath = path.parent / randomName + val actor = new LocalActorRef(app, props, supervisor, newPath, systemService = true) + actors.putIfAbsent(newPath.toString, actor) match { case null ⇒ actor - case other ⇒ throw new IllegalStateException("Same uuid generated twice for: " + actor + " and " + other) + case other ⇒ throw new IllegalStateException("Same path generated twice for: " + actor + " and " + other) } } else { val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout? - actors.putIfAbsent(address, newFuture) match { + actors.putIfAbsent(path.toString, newFuture) match { case null ⇒ val actor: ActorRef = try { - (if (systemService) None else deployer.lookupDeployment(address)) match { // see if the deployment already exists, if so use it, if not create actor + (if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor // create a local actor case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒ - new LocalActorRef(app, props, supervisor, address, systemService) // create a local actor + new LocalActorRef(app, props, supervisor, path, systemService) // create a local actor // create a routed actor ref case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ @@ -181,10 +213,12 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) } - val connections: Iterable[ActorRef] = - if (nrOfInstances.factor > 0) Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, props, supervisor, "", systemService)) else Nil + val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒ + val routedPath = path.parent / (path.name + ":" + i) + new LocalActorRef(app, props, supervisor, routedPath, systemService) + } - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, address) + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString) case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown) } @@ -196,7 +230,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { } newFuture completeWithResult actor - actors.replace(address, newFuture, actor) + actors.replace(path.toString, newFuture, actor) actor case actor: ActorRef ⇒ actor @@ -210,7 +244,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { /** * Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration. */ - def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef = { + def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = { // FIXME: this needs to take supervision into account! //FIXME clustering should be implemented by cluster actor ref provider @@ -218,16 +252,16 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor //TODO If address exists in config, it will override the specified Props (should we attempt to merge?) //TODO If the actor deployed uses a different config, then ignore or throw exception? - if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") + if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router") // val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled // val localOnly = props.localOnly // if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) // else new RoutedActorRef(props, address) - new RoutedActorRef(app, props, address) + new RoutedActorRef(app, props, supervisor, name) } - private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) - private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(actor.address, app.defaultAddress) + private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(ActorPath.split(actor.path)) + private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(app.defaultAddress, actor.path.toString) private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch @@ -237,7 +271,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { case t if t.duration.length <= 0 ⇒ new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout case t ⇒ val a = new AskActorRef(app)(timeout = t) { def whenDone() = actors.remove(this) } - assert(actors.putIfAbsent(a.address, a) eq null) //If this fails, we're in deep trouble + assert(actors.putIfAbsent(a.path.toString, a) eq null) //If this fails, we're in deep trouble recipient.tell(message, a) a.result } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index cb8dfd04eb..2a6795a599 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -19,16 +19,16 @@ trait ActorDeployer { private[akka] def init(deployments: Seq[Deploy]): Unit private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only? private[akka] def deploy(deployment: Deploy): Unit - private[akka] def lookupDeploymentFor(address: String): Option[Deploy] - def lookupDeployment(address: String): Option[Deploy] = address match { - case null | Props.`randomAddress` ⇒ None - case some ⇒ lookupDeploymentFor(some) + private[akka] def lookupDeploymentFor(path: String): Option[Deploy] + def lookupDeployment(path: String): Option[Deploy] = path match { + case null | Props.`randomName` ⇒ None + case some ⇒ lookupDeploymentFor(some) } private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) } /** - * Deployer maps actor deployments to actor addresses. + * Deployer maps actor paths to actor deployments. * * @author Jonas Bonér */ @@ -58,36 +58,36 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { def isClustered(deployment: Deploy): Boolean = !isLocal(deployment) - def isLocal(address: String): Boolean = isLocal(deploymentFor(address)) //TODO Should this throw exception if address not found? + def isLocal(path: String): Boolean = isLocal(deploymentFor(path)) //TODO Should this throw exception if path not found? - def isClustered(address: String): Boolean = !isLocal(address) //TODO Should this throw exception if address not found? + def isClustered(path: String): Boolean = !isLocal(path) //TODO Should this throw exception if path not found? /** * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. */ - private[akka] def deploymentFor(address: String): Deploy = { - lookupDeploymentFor(address) match { + private[akka] def deploymentFor(path: String): Deploy = { + lookupDeploymentFor(path) match { case Some(deployment) ⇒ deployment - case None ⇒ thrownNoDeploymentBoundException(address) + case None ⇒ thrownNoDeploymentBoundException(path) } } - private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = - instance.lookupDeploymentFor(address) + private[akka] def lookupDeploymentFor(path: String): Option[Deploy] = + instance.lookupDeploymentFor(path) private[akka] def deploymentsInConfig: List[Deploy] = { for { - address ← addressesInConfig - deployment ← lookupInConfig(address) + path ← pathsInConfig + deployment ← lookupInConfig(path) } yield deployment } - private[akka] def addressesInConfig: List[String] = { + private[akka] def pathsInConfig: List[String] = { val deploymentPath = "akka.actor.deployment" app.config.getSection(deploymentPath) match { case None ⇒ Nil - case Some(addressConfig) ⇒ - addressConfig.map.keySet + case Some(pathConfig) ⇒ + pathConfig.map.keySet .map(path ⇒ path.substring(0, path.indexOf("."))) .toSet.toList // toSet to force uniqueness } @@ -96,21 +96,21 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { /** * Lookup deployment in 'akka.conf' configuration file. */ - private[akka] def lookupInConfig(address: String, configuration: Configuration = app.config): Option[Deploy] = { + private[akka] def lookupInConfig(path: String, configuration: Configuration = app.config): Option[Deploy] = { import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor } // -------------------------------- - // akka.actor.deployment.
+ // akka.actor.deployment. // -------------------------------- - val addressPath = "akka.actor.deployment." + address - configuration.getSection(addressPath) match { + val deploymentKey = "akka.actor.deployment." + path + configuration.getSection(deploymentKey) match { case None ⇒ None - case Some(addressConfig) ⇒ + case Some(pathConfig) ⇒ // -------------------------------- - // akka.actor.deployment.
.router + // akka.actor.deployment..router // -------------------------------- - val router: Routing = addressConfig.getString("router", "direct") match { + val router: Routing = pathConfig.getString("router", "direct") match { case "direct" ⇒ Direct case "round-robin" ⇒ RoundRobin case "random" ⇒ Random @@ -122,12 +122,12 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } // -------------------------------- - // akka.actor.deployment.
.nr-of-instances + // akka.actor.deployment..nr-of-instances // -------------------------------- val nrOfInstances = { if (router == Direct) OneNrOfInstances else { - addressConfig.getAny("nr-of-instances", "1") match { + pathConfig.getAny("nr-of-instances", "1") match { case "auto" ⇒ AutoNrOfInstances case "1" ⇒ OneNrOfInstances case "0" ⇒ ZeroNrOfInstances @@ -137,7 +137,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } catch { case e: Exception ⇒ throw new ConfigurationException( - "Config option [" + addressPath + + "Config option [" + deploymentKey + ".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" + nrOfReplicas + "]") } @@ -146,37 +146,37 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } // -------------------------------- - // akka.actor.deployment.
.create-as + // akka.actor.deployment..create-as // -------------------------------- - val recipe: Option[ActorRecipe] = addressConfig.getSection("create-as") map { section ⇒ + val recipe: Option[ActorRecipe] = pathConfig.getSection("create-as") map { section ⇒ val implementationClass = section.getString("class") match { case Some(impl) ⇒ getClassFor[Actor](impl).fold(e ⇒ throw new ConfigurationException( - "Config option [" + addressPath + ".create-as.class] load failed", e), identity) + "Config option [" + deploymentKey + ".create-as.class] load failed", e), identity) case None ⇒ throw new ConfigurationException( - "Config option [" + addressPath + ".create-as.class] is missing, need the fully qualified name of the class") + "Config option [" + deploymentKey + ".create-as.class] is missing, need the fully qualified name of the class") } ActorRecipe(implementationClass) } // -------------------------------- - // akka.actor.deployment.
.remote + // akka.actor.deployment..remote // -------------------------------- - addressConfig.getSection("remote") match { + pathConfig.getSection("remote") match { case Some(remoteConfig) ⇒ // we have a 'remote' config section - if (addressConfig.getSection("cluster").isDefined) throw new ConfigurationException( - "Configuration for deployment ID [" + address + "] can not have both 'remote' and 'cluster' sections.") + if (pathConfig.getSection("cluster").isDefined) throw new ConfigurationException( + "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.") // -------------------------------- - // akka.actor.deployment.
.remote.nodes + // akka.actor.deployment..remote.nodes // -------------------------------- val remoteAddresses = remoteConfig.getList("nodes") match { case Nil ⇒ Nil case nodes ⇒ def raiseRemoteNodeParsingError() = throw new ConfigurationException( - "Config option [" + addressPath + + "Config option [" + deploymentKey + ".remote.nodes] needs to be a list with elements on format \":\", was [" + nodes.mkString(", ") + "]") nodes map { node ⇒ @@ -192,26 +192,26 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } } - Some(Deploy(address, recipe, router, nrOfInstances, RemoteScope(remoteAddresses))) + Some(Deploy(path, recipe, router, nrOfInstances, RemoteScope(remoteAddresses))) case None ⇒ // check for 'cluster' config section // -------------------------------- - // akka.actor.deployment.
.cluster + // akka.actor.deployment..cluster // -------------------------------- - addressConfig.getSection("cluster") match { + pathConfig.getSection("cluster") match { case None ⇒ None case Some(clusterConfig) ⇒ // -------------------------------- - // akka.actor.deployment.
.cluster.preferred-nodes + // akka.actor.deployment..cluster.preferred-nodes // -------------------------------- val preferredNodes = clusterConfig.getList("preferred-nodes") match { case Nil ⇒ Nil case homes ⇒ def raiseHomeConfigError() = throw new ConfigurationException( - "Config option [" + addressPath + + "Config option [" + deploymentKey + ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" + homes + "]") @@ -230,18 +230,18 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } // -------------------------------- - // akka.actor.deployment.
.cluster.replication + // akka.actor.deployment..cluster.replication // -------------------------------- clusterConfig.getSection("replication") match { case None ⇒ - Some(Deploy(address, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Transient))) + Some(Deploy(path, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Transient))) case Some(replicationConfig) ⇒ val storage = replicationConfig.getString("storage", "transaction-log") match { case "transaction-log" ⇒ TransactionLog case "data-grid" ⇒ DataGrid case unknown ⇒ - throw new ConfigurationException("Config option [" + addressPath + + throw new ConfigurationException("Config option [" + deploymentKey + ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + unknown + "]") } @@ -249,11 +249,11 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { case "write-through" ⇒ WriteThrough case "write-behind" ⇒ WriteBehind case unknown ⇒ - throw new ConfigurationException("Config option [" + addressPath + + throw new ConfigurationException("Config option [" + deploymentKey + ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + unknown + "]") } - Some(Deploy(address, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy)))) + Some(Deploy(path, recipe, router, nrOfInstances, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy)))) } } } @@ -261,13 +261,13 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = { - val e = new DeploymentAlreadyBoundException("Address [" + deployment.address + "] already bound to [" + deployment + "]") + val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]") log.error(e, e.getMessage) throw e } - private[akka] def thrownNoDeploymentBoundException(address: String): Nothing = { - val e = new NoDeploymentBoundException("Address [" + address + "] is not bound to a deployment") + private[akka] def thrownNoDeploymentBoundException(path: String): Nothing = { + val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment") log.error(e, e.getMessage) throw e } @@ -285,9 +285,9 @@ class LocalDeployer extends ActorDeployer { private[akka] def shutdown(): Unit = deployments.clear() //TODO do something else/more? - private[akka] def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.address, deployment) + private[akka] def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment) - private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = Option(deployments.get(address)) + private[akka] def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path)) } class DeploymentException private[akka] (message: String) extends AkkaException(message) diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 1782cda940..ed5d8d78b1 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -15,7 +15,7 @@ object DeploymentConfig { // --- Deploy // -------------------------------- case class Deploy( - address: String, + path: String, recipe: Option[ActorRecipe], routing: Routing = Direct, nrOfInstances: NrOfInstances = ZeroNrOfInstances, diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index c6732b6eca..a42ff45f6e 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -30,7 +30,7 @@ object Props { final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) final val noHotSwap: Stack[Actor.Receive] = Stack.empty - final val randomAddress: String = "" + final val randomName: String = "" /** * The default Props instance, uses the settings from the Props object starting with default* diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 2974808efc..551cf15c86 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -130,15 +130,15 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomAddress, interface.getClassLoader) + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomName, interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, address: String): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, address, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R = + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, name, interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -146,15 +146,15 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomAddress, interface.getClassLoader) + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomName, interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, address: String): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, address, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R = + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, name, interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -162,15 +162,15 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomAddress, loader) + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomName, loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, address: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, address, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, name, loader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -178,73 +178,73 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomAddress, loader) + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomName, loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, address: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, address, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, name, loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Props.randomAddress, loader) + typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Props.randomName, loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ - def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, address: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, address, loader) + def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, name, loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ - def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), address: String = Props.randomAddress, loader: ClassLoader = null)(implicit m: Manifest[T]): R = { + def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = Props.randomName, loader: ClassLoader = null)(implicit m: Manifest[T]): R = { val clazz = m.erasure.asInstanceOf[Class[T]] - typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, address, if (loader eq null) clazz.getClassLoader else loader) + typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, name, if (loader eq null) clazz.getClassLoader else loader) } /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), address: String = Props.randomAddress, loader: ClassLoader = null)(implicit m: Manifest[R]): R = - typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Props.randomAddress, if (loader eq null) m.erasure.getClassLoader else loader) + def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = Props.randomName, loader: ClassLoader = null)(implicit m: Manifest[R]): R = + typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Props.randomName, if (loader eq null) m.erasure.getClassLoader else loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = - typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Props.randomAddress, loader) + typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Props.randomName, loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, address: String, loader: ClassLoader): R = - typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, address, loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, name, loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = - typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Props.randomAddress, loader) + typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Props.randomName, loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, address: String, loader: ClassLoader): R = - typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, address, loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, name, loader) } @@ -302,15 +302,15 @@ class TypedActor(val app: AkkaApplication) { } else null - private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, address: String, loader: ClassLoader): R = { + private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = { val proxyVar = new AtomVar[R] - configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), address, loader) + configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader) } - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, address: String, loader: ClassLoader): R = - createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, address, loader) + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: String, loader: ClassLoader): R = + createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader) - private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, address: String, loader: ClassLoader): T = { + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: String, loader: ClassLoader): T = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) val timeout = props.timeout match { @@ -319,7 +319,7 @@ class TypedActor(val app: AkkaApplication) { } val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar, timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - val ref = supervisor.actorOf(props, address) + val ref = supervisor.actorOf(props, name) actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet proxyVar.get } diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index 0178db875f..49d4a2d660 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -8,6 +8,10 @@ package object actor { implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = ref.asInstanceOf[ScalaActorRef] implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = ref.asInstanceOf[ActorRef] + // actor path can be used as an actor ref (note: does a lookup in the app using path.ref) + implicit def actorPath2Ref(path: ActorPath): ActorRef = path.ref.getOrElse(path.app.deadLetters) + implicit def actorPath2ScalaRef(path: ActorPath): ScalaActorRef = actorPath2Ref(path).asInstanceOf[ScalaActorRef] + type Uuid = com.eaio.uuid.UUID def newUuid(): Uuid = new Uuid() diff --git a/akka-actor/src/main/scala/akka/config/ConfigParser.scala b/akka-actor/src/main/scala/akka/config/ConfigParser.scala index 39b961a24d..4b3d4abdaa 100644 --- a/akka-actor/src/main/scala/akka/config/ConfigParser.scala +++ b/akka-actor/src/main/scala/akka/config/ConfigParser.scala @@ -25,9 +25,9 @@ class ConfigParser(var prefix: String = "", map: mutable.Map[String, Any] = muta val numberToken: Parser[String] = """-?\d+(\.\d+)?""".r val stringToken: Parser[String] = ("\"" + """([^\\\"]|\\[^ux]|\\\n|\\u[0-9a-fA-F]{4}|\\x[0-9a-fA-F]{2})*""" + "\"").r val booleanToken: Parser[String] = "(true|on|false|off)".r - val identToken: Parser[String] = """([\da-zA-Z_][-\w]*)(\.[a-zA-Z_][-\w]*)*""".r + val identToken: Parser[String] = """([\da-zA-Z_/][-\w]*)(\.[a-zA-Z_/][-/\w]*)*""".r val assignToken: Parser[String] = "=".r - val sectionToken: Parser[String] = """[a-zA-Z][-\w]*""".r + val sectionToken: Parser[String] = """[a-zA-Z_/][-/\w]*""".r // values diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 950c23f2b1..f780f0fcc6 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -3,7 +3,7 @@ */ package akka.event -import akka.actor.{ Actor, ActorRef, MinimalActorRef, LocalActorRef, Props } +import akka.actor.{ Actor, ActorPath, ActorRef, MinimalActorRef, LocalActorRef, Props } import akka.{ AkkaException, AkkaApplication } import akka.AkkaApplication.AkkaConfig import akka.util.ReflectiveAccess @@ -129,7 +129,7 @@ trait LoggingBus extends ActorEventBus { } private def addLogger(app: AkkaApplication, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { - val actor = app.systemActorOf(Props(clazz), Props.randomAddress) + val actor = app.systemActorOf(Props(clazz), Props.randomName) actor ! InitializeLogger(this) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) publish(Info(this, "logger " + clazz.getName + " started")) @@ -339,6 +339,9 @@ object Logging { * akka.stdout-loglevel in akka.conf. */ class StandardOutLogger extends MinimalActorRef with StdOutLogger { + override val name: String = "standard-out-logger" + val path: ActorPath = null // pathless + val address: String = name override val toString = "StandardOutLogger" override def postMessageToMailbox(obj: Any, sender: ActorRef) { print(obj) } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 88026249bb..e78b99e572 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -104,7 +104,12 @@ abstract private[akka] class AbstractRoutedActorRef(val app: AkkaApplication, va * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to * on (or more) of these actors. */ -private[akka] class RoutedActorRef(app: AkkaApplication, val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(app, routedProps) { +private[akka] class RoutedActorRef(app: AkkaApplication, val routedProps: RoutedProps, val supervisor: ActorRef, override val name: String) extends AbstractRoutedActorRef(app, routedProps) { + + val path = supervisor.path / name + + // FIXME (actor path): address normally has host and port, what about routed actor ref? + def address = "routed:/" + path.toString @volatile private var running: Boolean = true diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala index 2e1d19817b..67499bb02b 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala @@ -32,8 +32,8 @@ private[camel] object TypedCamel { * and re-uses the activationTracker of service. */ def onCamelServiceStart(service: CamelService) { - consumerPublisher = new LocalActorRef(Props(new TypedConsumerPublisher(service.activationTracker)), Props.randomAddress, true) - publishRequestor = new LocalActorRef(Props(new TypedConsumerPublishRequestor), Props.randomAddress, true) + consumerPublisher = new LocalActorRef(Props(new TypedConsumerPublisher(service.activationTracker)), Props.randomName, true) + publishRequestor = new LocalActorRef(Props(new TypedConsumerPublishRequestor), Props.randomName, true) registerPublishRequestor diff --git a/akka-camel/src/main/scala/akka/camel/CamelService.scala b/akka-camel/src/main/scala/akka/camel/CamelService.scala index eeb039cb8d..643d18fb18 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelService.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelService.scala @@ -26,9 +26,9 @@ import TypedCamelAccess._ * @author Martin Krasser */ trait CamelService extends Bootable { - private[camel] val activationTracker = new LocalActorRef(Props[ActivationTracker], Props.randomAddress, true) - private[camel] val consumerPublisher = new LocalActorRef(Props(new ConsumerPublisher(activationTracker)), Props.randomAddress, true) - private[camel] val publishRequestor = new LocalActorRef(Props(new ConsumerPublishRequestor), Props.randomAddress, true) + private[camel] val activationTracker = new LocalActorRef(Props[ActivationTracker], Props.randomName, true) + private[camel] val consumerPublisher = new LocalActorRef(Props(new ConsumerPublisher(activationTracker)), Props.randomName, true) + private[camel] val publishRequestor = new LocalActorRef(Props(new ConsumerPublishRequestor), Props.randomName, true) private val serviceEnabled = config.getList("akka.enabled-modules").exists(_ == "camel") diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 7227839c9a..274ac6e9dc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1860,7 +1860,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { Props( self ⇒ { case f: Function0[_] ⇒ try { f() } finally { self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) + }).copy(dispatcher = computeGridDispatcher), Props.randomName, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) } def handle_fun0_any(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) { @@ -1868,7 +1868,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { Props( self ⇒ { case f: Function0[_] ⇒ try { self.reply(f()) } finally { self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) + }).copy(dispatcher = computeGridDispatcher), Props.randomName, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) } def handle_fun1_arg_unit(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) { @@ -1876,7 +1876,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { Props( self ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) + }).copy(dispatcher = computeGridDispatcher), Props.randomName, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) } def handle_fun1_arg_any(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) { @@ -1884,7 +1884,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { Props( self ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { self.reply(fun.asInstanceOf[Any ⇒ Any](param)) } finally { self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + }).copy(dispatcher = computeGridDispatcher), Props.randomName, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) } def handleFailover(message: RemoteProtocol.RemoteSystemDaemonMessageProtocol) { diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index e7f4f2c4b5..eb383658b8 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -31,7 +31,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM "should handle reply to ! for 1 message" in { val latch = new CountDownLatch(1) val queueActor = createMailboxTestActor(backendName + " should handle reply to !") - val sender = new LocalActorRef(Props(self ⇒ { case "sum" ⇒ latch.countDown }), Props.randomAddress, true) + val sender = new LocalActorRef(Props(self ⇒ { case "sum" ⇒ latch.countDown }), Props.randomName, true) queueActor.!("sum")(Some(sender)) latch.await(10, TimeUnit.SECONDS) must be(true) @@ -40,7 +40,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM "should handle reply to ! for multiple messages" in { val latch = new CountDownLatch(5) val queueActor = createMailboxTestActor(backendName + " should handle reply to !") - val sender = new LocalActorRef(Props(self ⇒ { case "sum" ⇒ latch.countDown }), Props.randomAddress, true) + val sender = new LocalActorRef(Props(self ⇒ { case "sum" ⇒ latch.countDown }), Props.randomName, true) for (i ← 1 to 5) queueActor.!("sum")(Some(sender)) diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 89ee558709..abe7edf647 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -2711,17 +2711,17 @@ public final class RemoteProtocol { public interface ActorRefProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string address = 1; - boolean hasAddress(); - String getAddress(); - - // required string host = 2; + // required string host = 1; boolean hasHost(); String getHost(); - // required uint32 port = 3; + // required uint32 port = 2; boolean hasPort(); int getPort(); + + // required string path = 3; + boolean hasPath(); + String getPath(); } public static final class ActorRefProtocol extends com.google.protobuf.GeneratedMessage @@ -2752,43 +2752,11 @@ public final class RemoteProtocol { } private int bitField0_; - // required string address = 1; - public static final int ADDRESS_FIELD_NUMBER = 1; - private java.lang.Object address_; - public boolean hasAddress() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public String getAddress() { - java.lang.Object ref = address_; - if (ref instanceof String) { - return (String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - String s = bs.toStringUtf8(); - if (com.google.protobuf.Internal.isValidUtf8(bs)) { - address_ = s; - } - return s; - } - } - private com.google.protobuf.ByteString getAddressBytes() { - java.lang.Object ref = address_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8((String) ref); - address_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - // required string host = 2; - public static final int HOST_FIELD_NUMBER = 2; + // required string host = 1; + public static final int HOST_FIELD_NUMBER = 1; private java.lang.Object host_; public boolean hasHost() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000001) == 0x00000001); } public String getHost() { java.lang.Object ref = host_; @@ -2816,30 +2784,58 @@ public final class RemoteProtocol { } } - // required uint32 port = 3; - public static final int PORT_FIELD_NUMBER = 3; + // required uint32 port = 2; + public static final int PORT_FIELD_NUMBER = 2; private int port_; public boolean hasPort() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public int getPort() { return port_; } + // required string path = 3; + public static final int PATH_FIELD_NUMBER = 3; + private java.lang.Object path_; + public boolean hasPath() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public String getPath() { + java.lang.Object ref = path_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + path_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getPathBytes() { + java.lang.Object ref = path_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + path_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { - address_ = ""; host_ = ""; port_ = 0; + path_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasAddress()) { - memoizedIsInitialized = 0; - return false; - } if (!hasHost()) { memoizedIsInitialized = 0; return false; @@ -2848,6 +2844,10 @@ public final class RemoteProtocol { memoizedIsInitialized = 0; return false; } + if (!hasPath()) { + memoizedIsInitialized = 0; + return false; + } memoizedIsInitialized = 1; return true; } @@ -2856,13 +2856,13 @@ public final class RemoteProtocol { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getAddressBytes()); + output.writeBytes(1, getHostBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, getHostBytes()); + output.writeUInt32(2, port_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeUInt32(3, port_); + output.writeBytes(3, getPathBytes()); } getUnknownFields().writeTo(output); } @@ -2875,15 +2875,15 @@ public final class RemoteProtocol { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getAddressBytes()); + .computeBytesSize(1, getHostBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getHostBytes()); + .computeUInt32Size(2, port_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(3, port_); + .computeBytesSize(3, getPathBytes()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -3009,11 +3009,11 @@ public final class RemoteProtocol { public Builder clear() { super.clear(); - address_ = ""; - bitField0_ = (bitField0_ & ~0x00000001); host_ = ""; - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000001); port_ = 0; + bitField0_ = (bitField0_ & ~0x00000002); + path_ = ""; bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -3056,15 +3056,15 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.address_ = address_; + result.host_ = host_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.host_ = host_; + result.port_ = port_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.port_ = port_; + result.path_ = path_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3081,24 +3081,20 @@ public final class RemoteProtocol { public Builder mergeFrom(akka.remote.RemoteProtocol.ActorRefProtocol other) { if (other == akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) return this; - if (other.hasAddress()) { - setAddress(other.getAddress()); - } if (other.hasHost()) { setHost(other.getHost()); } if (other.hasPort()) { setPort(other.getPort()); } + if (other.hasPath()) { + setPath(other.getPath()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } public final boolean isInitialized() { - if (!hasAddress()) { - - return false; - } if (!hasHost()) { return false; @@ -3107,6 +3103,10 @@ public final class RemoteProtocol { return false; } + if (!hasPath()) { + + return false; + } return true; } @@ -3135,65 +3135,29 @@ public final class RemoteProtocol { } case 10: { bitField0_ |= 0x00000001; - address_ = input.readBytes(); - break; - } - case 18: { - bitField0_ |= 0x00000002; host_ = input.readBytes(); break; } - case 24: { - bitField0_ |= 0x00000004; + case 16: { + bitField0_ |= 0x00000002; port_ = input.readUInt32(); break; } + case 26: { + bitField0_ |= 0x00000004; + path_ = input.readBytes(); + break; + } } } } private int bitField0_; - // required string address = 1; - private java.lang.Object address_ = ""; - public boolean hasAddress() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public String getAddress() { - java.lang.Object ref = address_; - if (!(ref instanceof String)) { - String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - address_ = s; - return s; - } else { - return (String) ref; - } - } - public Builder setAddress(String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - address_ = value; - onChanged(); - return this; - } - public Builder clearAddress() { - bitField0_ = (bitField0_ & ~0x00000001); - address_ = getDefaultInstance().getAddress(); - onChanged(); - return this; - } - void setAddress(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000001; - address_ = value; - onChanged(); - } - - // required string host = 2; + // required string host = 1; private java.lang.Object host_ = ""; public boolean hasHost() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000001) == 0x00000001); } public String getHost() { java.lang.Object ref = host_; @@ -3209,44 +3173,80 @@ public final class RemoteProtocol { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000001; host_ = value; onChanged(); return this; } public Builder clearHost() { - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000001); host_ = getDefaultInstance().getHost(); onChanged(); return this; } void setHost(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000001; host_ = value; onChanged(); } - // required uint32 port = 3; + // required uint32 port = 2; private int port_ ; public boolean hasPort() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public int getPort() { return port_; } public Builder setPort(int value) { - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; port_ = value; onChanged(); return this; } public Builder clearPort() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); port_ = 0; onChanged(); return this; } + // required string path = 3; + private java.lang.Object path_ = ""; + public boolean hasPath() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public String getPath() { + java.lang.Object ref = path_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + path_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setPath(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + path_ = value; + onChanged(); + return this; + } + public Builder clearPath() { + bitField0_ = (bitField0_ & ~0x00000004); + path_ = getDefaultInstance().getPath(); + onChanged(); + return this; + } + void setPath(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000004; + path_ = value; + onChanged(); + } + // @@protoc_insertion_point(builder_scope:ActorRefProtocol) } @@ -5469,9 +5469,9 @@ public final class RemoteProtocol { boolean hasMessageType(); akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType getMessageType(); - // optional string actorAddress = 2; - boolean hasActorAddress(); - String getActorAddress(); + // optional string actorPath = 2; + boolean hasActorPath(); + String getActorPath(); // optional bytes payload = 3; boolean hasPayload(); @@ -5521,14 +5521,14 @@ public final class RemoteProtocol { return messageType_; } - // optional string actorAddress = 2; - public static final int ACTORADDRESS_FIELD_NUMBER = 2; - private java.lang.Object actorAddress_; - public boolean hasActorAddress() { + // optional string actorPath = 2; + public static final int ACTORPATH_FIELD_NUMBER = 2; + private java.lang.Object actorPath_; + public boolean hasActorPath() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public String getActorAddress() { - java.lang.Object ref = actorAddress_; + public String getActorPath() { + java.lang.Object ref = actorPath_; if (ref instanceof String) { return (String) ref; } else { @@ -5536,17 +5536,17 @@ public final class RemoteProtocol { (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (com.google.protobuf.Internal.isValidUtf8(bs)) { - actorAddress_ = s; + actorPath_ = s; } return s; } } - private com.google.protobuf.ByteString getActorAddressBytes() { - java.lang.Object ref = actorAddress_; + private com.google.protobuf.ByteString getActorPathBytes() { + java.lang.Object ref = actorPath_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((String) ref); - actorAddress_ = b; + actorPath_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -5578,7 +5578,7 @@ public final class RemoteProtocol { private void initFields() { messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP; - actorAddress_ = ""; + actorPath_ = ""; payload_ = com.google.protobuf.ByteString.EMPTY; replicateActorFromUuid_ = akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance(); } @@ -5608,7 +5608,7 @@ public final class RemoteProtocol { output.writeEnum(1, messageType_.getNumber()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, getActorAddressBytes()); + output.writeBytes(2, getActorPathBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, payload_); @@ -5631,7 +5631,7 @@ public final class RemoteProtocol { } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getActorAddressBytes()); + .computeBytesSize(2, getActorPathBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream @@ -5768,7 +5768,7 @@ public final class RemoteProtocol { super.clear(); messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP; bitField0_ = (bitField0_ & ~0x00000001); - actorAddress_ = ""; + actorPath_ = ""; bitField0_ = (bitField0_ & ~0x00000002); payload_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000004); @@ -5823,7 +5823,7 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.actorAddress_ = actorAddress_; + result.actorPath_ = actorPath_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } @@ -5855,8 +5855,8 @@ public final class RemoteProtocol { if (other.hasMessageType()) { setMessageType(other.getMessageType()); } - if (other.hasActorAddress()) { - setActorAddress(other.getActorAddress()); + if (other.hasActorPath()) { + setActorPath(other.getActorPath()); } if (other.hasPayload()) { setPayload(other.getPayload()); @@ -5918,7 +5918,7 @@ public final class RemoteProtocol { } case 18: { bitField0_ |= 0x00000002; - actorAddress_ = input.readBytes(); + actorPath_ = input.readBytes(); break; } case 26: { @@ -5965,39 +5965,39 @@ public final class RemoteProtocol { return this; } - // optional string actorAddress = 2; - private java.lang.Object actorAddress_ = ""; - public boolean hasActorAddress() { + // optional string actorPath = 2; + private java.lang.Object actorPath_ = ""; + public boolean hasActorPath() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public String getActorAddress() { - java.lang.Object ref = actorAddress_; + public String getActorPath() { + java.lang.Object ref = actorPath_; if (!(ref instanceof String)) { String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - actorAddress_ = s; + actorPath_ = s; return s; } else { return (String) ref; } } - public Builder setActorAddress(String value) { + public Builder setActorPath(String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000002; - actorAddress_ = value; + actorPath_ = value; onChanged(); return this; } - public Builder clearActorAddress() { + public Builder clearActorPath() { bitField0_ = (bitField0_ & ~0x00000002); - actorAddress_ = getDefaultInstance().getActorAddress(); + actorPath_ = getDefaultInstance().getActorPath(); onChanged(); return this; } - void setActorAddress(com.google.protobuf.ByteString value) { + void setActorPath(com.google.protobuf.ByteString value) { bitField0_ |= 0x00000002; - actorAddress_ = value; + actorPath_ = value; onChanged(); } @@ -6864,35 +6864,35 @@ public final class RemoteProtocol { "\0132\026.MetadataEntryProtocol\"l\n\025RemoteContr" + "olProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comman", "dType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020." + - "AddressProtocol\"?\n\020ActorRefProtocol\022\017\n\007a" + - "ddress\030\001 \002(\t\022\014\n\004host\030\002 \002(\t\022\014\n\004port\030\003 \002(\r" + - "\";\n\017MessageProtocol\022\017\n\007message\030\001 \002(\014\022\027\n\017" + - "messageManifest\030\002 \001(\014\")\n\014UuidProtocol\022\014\n" + - "\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntr" + - "yProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"1\n" + - "\017AddressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004po" + - "rt\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassna" + - "me\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\256\001\n!RemoteSyst", - "emDaemonMessageProtocol\0223\n\013messageType\030\001" + - " \002(\0162\036.RemoteSystemDaemonMessageType\022\024\n\014" + - "actorAddress\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026r" + - "eplicateActorFromUuid\030\004 \001(\0132\r.UuidProtoc" + - "ol\"y\n\035DurableMailboxMessageProtocol\022$\n\tr" + - "ecipient\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sen" + - "der\030\002 \001(\0132\021.ActorRefProtocol\022\017\n\007message\030" + - "\003 \002(\014*(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHU" + - "TDOWN\020\002*K\n\026ReplicationStorageType\022\r\n\tTRA" + - "NSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GR", - "ID\020\003*>\n\027ReplicationStrategyType\022\021\n\rWRITE" + - "_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteS" + - "ystemDaemonMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020" + - "\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MA" + - "KE_UNAVAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECO" + - "NNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_" + - "OVER_CONNECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT" + - "\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FU" + - "N1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030" + - "B\017\n\013akka.remoteH\001" + "AddressProtocol\"<\n\020ActorRefProtocol\022\014\n\004h" + + "ost\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\022\014\n\004path\030\003 \002(\t\";\n" + + "\017MessageProtocol\022\017\n\007message\030\001 \002(\014\022\027\n\017mes" + + "sageManifest\030\002 \001(\014\")\n\014UuidProtocol\022\014\n\004hi" + + "gh\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntryPr" + + "otocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"1\n\017Ad" + + "dressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030" + + "\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030" + + "\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\253\001\n!RemoteSystemD", + "aemonMessageProtocol\0223\n\013messageType\030\001 \002(" + + "\0162\036.RemoteSystemDaemonMessageType\022\021\n\tact" + + "orPath\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replica" + + "teActorFromUuid\030\004 \001(\0132\r.UuidProtocol\"y\n\035" + + "DurableMailboxMessageProtocol\022$\n\trecipie" + + "nt\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 " + + "\001(\0132\021.ActorRefProtocol\022\017\n\007message\030\003 \002(\014*" + + "(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020" + + "\002*K\n\026ReplicationStorageType\022\r\n\tTRANSIENT" + + "\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>", + "\n\027ReplicationStrategyType\022\021\n\rWRITE_THROU" + + "GH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteSystemD" + + "aemonMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020\002\022\013\n\007R" + + "ELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MAKE_UNA" + + "VAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECONNECT\020" + + "\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_OVER_C" + + "ONNECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT\020\025\022\025\n\021" + + "FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FUN1_ARG" + + "_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030B\017\n\013ak" + + "ka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6928,7 +6928,7 @@ public final class RemoteProtocol { internal_static_ActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ActorRefProtocol_descriptor, - new java.lang.String[] { "Address", "Host", "Port", }, + new java.lang.String[] { "Host", "Port", "Path", }, akka.remote.RemoteProtocol.ActorRefProtocol.class, akka.remote.RemoteProtocol.ActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = @@ -6976,7 +6976,7 @@ public final class RemoteProtocol { internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteSystemDaemonMessageProtocol_descriptor, - new java.lang.String[] { "MessageType", "ActorAddress", "Payload", "ReplicateActorFromUuid", }, + new java.lang.String[] { "MessageType", "ActorPath", "Payload", "ReplicateActorFromUuid", }, akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.class, akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.Builder.class); internal_static_DurableMailboxMessageProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index d777009950..6bd62c8c16 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -66,9 +66,9 @@ enum ReplicationStrategyType { * on the original node. */ message ActorRefProtocol { - required string address = 1; - required string host = 2; - required uint32 port = 3; + required string host = 1; + required uint32 port = 2; + required string path = 3; } /** @@ -116,7 +116,7 @@ message ExceptionProtocol { */ message RemoteSystemDaemonMessageProtocol { required RemoteSystemDaemonMessageType messageType = 1; - optional string actorAddress = 2; + optional string actorPath = 2; optional bytes payload = 3; optional UuidProtocol replicateActorFromUuid = 4; } diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 4a5efc4f40..0226d771a8 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -157,7 +157,7 @@ class Gossiper(remote: Remote) { node ← oldAvailableNodes if connectionManager.connectionFor(node).isEmpty } { - val connectionFactory = () ⇒ RemoteActorRef(remote.server, gossipingNode, remote.remoteDaemonServiceName, None) + val connectionFactory = () ⇒ RemoteActorRef(remote.server, gossipingNode, remote.remoteDaemon.path, None) connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes } @@ -310,7 +310,7 @@ class Gossiper(remote: Remote) { RemoteSystemDaemonMessageProtocol.newBuilder .setMessageType(GOSSIP) - .setActorAddress(remote.remoteDaemonServiceName) + .setActorPath(remote.remoteDaemon.path.toString) .setPayload(ByteString.copyFrom(gossipAsBytes)) .build() } diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 872b2c23f3..698f128897 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -67,7 +67,7 @@ class NetworkEventStream(val app: AkkaApplication) { // FIXME: check that this supervision is correct private[akka] val sender = app.provider.actorOf( Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), - app.guardian, Props.randomAddress, systemService = true) + app.guardian, Props.randomName, systemService = true) /** * Registers a network event stream listener (asyncronously). diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 31e7f77b3d..3c939bf67e 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -51,7 +51,7 @@ class Remote(val app: AkkaApplication) { val computeGridDispatcher = dispatcherFactory.newDispatcher("akka:compute-grid").build private[remote] lazy val remoteDaemonSupervisor = app.actorOf(Props( - OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want? + OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want? private[remote] lazy val remoteDaemon = app.provider.actorOf( @@ -129,7 +129,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { def handleUse(message: RemoteSystemDaemonMessageProtocol) { try { - if (message.hasActorAddress) { + if (message.hasActorPath) { val actorFactoryBytes = if (shouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray @@ -140,7 +140,15 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor] } - app.actorOf(Props(creator = actorFactory), message.getActorAddress) + val actorPath = ActorPath(remote.app, message.getActorPath) + val parent = actorPath.parent.ref + + if (parent.isDefined) { + app.provider.actorOf(Props(creator = actorFactory), parent.get, actorPath.name) + } else { + log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message) + } + } else { log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message) } @@ -180,7 +188,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case f: Function0[_] ⇒ try { f() } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) } // FIXME: handle real remote supervision @@ -189,7 +197,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) } // FIXME: handle real remote supervision @@ -198,7 +206,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) } // FIXME: handle real remote supervision @@ -207,7 +215,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) } def handleFailover(message: RemoteSystemDaemonMessageProtocol) { @@ -227,10 +235,11 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo lazy val sender: ActorRef = if (input.hasSender) remote.app.provider.deserialize( - SerializedActorRef(input.getSender.getAddress, input.getSender.getHost, input.getSender.getPort)).getOrElse(throw new IllegalStateException("OHNOES")) + SerializedActorRef(input.getSender.getHost, input.getSender.getPort, input.getSender.getPath)).getOrElse(throw new IllegalStateException("OHNOES")) else remote.app.deadLetters - lazy val recipient: ActorRef = remote.app.actorFor(input.getRecipient.getAddress).getOrElse(remote.app.deadLetters) + + lazy val recipient: ActorRef = remote.app.actorFor(input.getRecipient.getPath).getOrElse(remote.app.deadLetters) lazy val payload: Either[Throwable, AnyRef] = if (input.hasException) Left(parseException()) @@ -252,7 +261,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo } } - override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getAddress + ") from " + sender + override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getPath + ") from " + sender } trait RemoteMarshallingOps { @@ -276,7 +285,7 @@ trait RemoteMarshallingOps { */ def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = { val rep = app.provider.serialize(actor) - ActorRefProtocol.newBuilder.setAddress(rep.address).setHost(rep.hostname).setPort(rep.port).build + ActorRefProtocol.newBuilder.setHost(rep.hostname).setPort(rep.port).setPath(rep.path).build } def createRemoteMessageProtocolBuilder( diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index eca7372f3f..90e3b20902 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -51,15 +51,19 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider def defaultDispatcher = app.dispatcher def defaultTimeout = app.AkkaConfig.ActorTimeout - def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef = - if (systemService) local.actorOf(props, supervisor, address, systemService) + private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = + actorOf(props, supervisor, supervisor.path / name, systemService) + + private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = + if (systemService) local.actorOf(props, supervisor, path, systemService) else { + val name = path.name val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? - actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future + actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future case null ⇒ val actor: ActorRef = try { - deployer.lookupDeploymentFor(address) match { + deployer.lookupDeploymentFor(path.toString) match { case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ // FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one @@ -76,7 +80,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider if (isReplicaNode) { // we are on one of the replica node for this remote actor - local.actorOf(props, supervisor, address, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create + local.actorOf(props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create } else { // we are on the single "reference" node uses the remote actors on the replica nodes @@ -84,25 +88,25 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider case RouterType.Direct ⇒ if (remoteAddresses.size != 1) throw new ConfigurationException( "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) + .format(name, remoteAddresses.mkString(", "))) () ⇒ new DirectRouter case RouterType.Random ⇒ if (remoteAddresses.size < 1) throw new ConfigurationException( "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) + .format(name, remoteAddresses.mkString(", "))) () ⇒ new RandomRouter case RouterType.RoundRobin ⇒ if (remoteAddresses.size < 1) throw new ConfigurationException( "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) + .format(name, remoteAddresses.mkString(", "))) () ⇒ new RoundRobinRouter case RouterType.ScatterGather ⇒ if (remoteAddresses.size < 1) throw new ConfigurationException( "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) + .format(name, remoteAddresses.mkString(", "))) () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") @@ -113,17 +117,17 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ val inetAddr = new InetSocketAddress(a.hostname, a.port) - conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) + conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, path, None)) } val connectionManager = new RemoteConnectionManager(app, remote, connections) - connections.keys foreach { useActorOnNode(_, address, props.creator) } + connections.keys foreach { useActorOnNode(_, path.toString, props.creator) } - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, address) + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name) } - case deploy ⇒ local.actorOf(props, supervisor, address, systemService) + case deploy ⇒ local.actorOf(props, supervisor, name, systemService) } } catch { case e: Exception ⇒ @@ -134,7 +138,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later newFuture completeWithResult actor - actors.replace(address, newFuture, actor) + actors.replace(path.toString, newFuture, actor) actor case actor: ActorRef ⇒ actor case future: Future[_] ⇒ future.get.asInstanceOf[ActorRef] @@ -145,13 +149,13 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider * Copied from LocalActorRefProvider... */ // FIXME: implement supervision - def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef = { - if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") - new RoutedActorRef(app, props, address) + def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = { + if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router") + new RoutedActorRef(app, props, supervisor, name) } - def actorFor(address: String): Option[ActorRef] = actors.get(address) match { - case null ⇒ local.actorFor(address) + def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match { + case null ⇒ local.actorFor(path) case actor: ActorRef ⇒ Some(actor) case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) } @@ -162,27 +166,28 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider /** * Returns true if the actor was in the provider's cache and evicted successfully, else false. */ - private[akka] def evict(address: String): Boolean = actors.remove(address) ne null + private[akka] def evict(path: String): Boolean = actors.remove(path) ne null + private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match { - case r: RemoteActorRef ⇒ new SerializedActorRef(actor.address, r.remoteAddress) + case r: RemoteActorRef ⇒ new SerializedActorRef(r.remoteAddress, actor.path.toString) case other ⇒ local.serialize(actor) } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { if (optimizeLocalScoped_? && (actor.hostname == app.hostname || actor.hostname == app.defaultAddress.getHostName) && actor.port == app.port) { - local.actorFor(actor.address) + local.actorFor(ActorPath.split(actor.path)) } else { val remoteInetSocketAddress = new InetSocketAddress(actor.hostname, actor.port) //FIXME Drop the InetSocketAddresses and use RemoteAddress - log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.address, remoteInetSocketAddress) - Some(RemoteActorRef(remote.server, remoteInetSocketAddress, actor.address, None)) //Should it be None here + log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.path, remoteInetSocketAddress) + Some(RemoteActorRef(remote.server, remoteInetSocketAddress, ActorPath(app, actor.path), None)) //Should it be None here } } /** * Using (checking out) actor on a specific node. */ - def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () ⇒ Actor) { - log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorAddress, remoteAddress) + def useActorOnNode(remoteAddress: InetSocketAddress, actorPath: String, actorFactory: () ⇒ Actor) { + log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorPath, remoteAddress) val actorFactoryBytes = app.serialization.serialize(actorFactory) match { @@ -192,11 +197,11 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider val command = RemoteSystemDaemonMessageProtocol.newBuilder .setMessageType(USE) - .setActorAddress(actorAddress) + .setActorPath(actorPath) .setPayload(ByteString.copyFrom(actorFactoryBytes)) .build() - val connectionFactory = () ⇒ deserialize(new SerializedActorRef(remote.remoteDaemonServiceName, remoteAddress)).get + val connectionFactory = () ⇒ deserialize(new SerializedActorRef(remoteAddress, remote.remoteDaemon.path.toString)).get // try to get the connection for the remote address, if not already there then create it val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory) @@ -245,12 +250,17 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider private[akka] case class RemoteActorRef private[akka] ( remote: RemoteSupport, remoteAddress: InetSocketAddress, - address: String, + path: ActorPath, loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { + @volatile private var running: Boolean = true + def name = path.name + + def address = remoteAddress.getAddress.getHostAddress + ":" + remoteAddress.getPort + path.toString + def isShutdown: Boolean = !running protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index f76f9d072a..f088cebbca 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -74,7 +74,7 @@ class RemoteConnectionManager( case (`from`, actorRef) ⇒ changed = true //actorRef.stop() - (to, newConnection(actorRef.address, to)) + (to, newConnection(to, actorRef.path)) case other ⇒ other } @@ -149,7 +149,7 @@ class RemoteConnectionManager( } } - private[remote] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { - RemoteActorRef(remote.server, inetSocketAddress, actorAddress, None) + private[remote] def newConnection(inetSocketAddress: InetSocketAddress, actorPath: ActorPath) = { + RemoteActorRef(remote.server, inetSocketAddress, actorPath, None) } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf index a7db5ca6e6..a10ac35b1c 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "direct" -akka.actor.deployment.service-hello.nr-of-instances = 1 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] +akka.actor.deployment./app/service-hello.router = "direct" +akka.actor.deployment./app/service-hello.nr-of-instances = 1 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf index a7db5ca6e6..a10ac35b1c 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "direct" -akka.actor.deployment.service-hello.nr-of-instances = 1 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] +akka.actor.deployment./app/service-hello.router = "direct" +akka.actor.deployment./app/service-hello.nr-of-instances = 1 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf index 8281319e9a..ab653b70ed 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf @@ -1,3 +1,3 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf index 8281319e9a..ab653b70ed 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf @@ -1,3 +1,3 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf index 4a171ba96f..e06f4a67ca 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "random" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "random" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf index 4a171ba96f..e06f4a67ca 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "random" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "random" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf index 4a171ba96f..e06f4a67ca 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "random" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "random" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf index 4a171ba96f..e06f4a67ca 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "random" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "random" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf index 08c0dc70d2..1b833c6509 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "round-robin" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "round-robin" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf index 08c0dc70d2..1b833c6509 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "round-robin" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "round-robin" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf index 08c0dc70d2..1b833c6509 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "round-robin" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "round-robin" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf index 08c0dc70d2..1b833c6509 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf @@ -1,5 +1,5 @@ akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.router = "round-robin" -akka.actor.deployment.service-hello.nr-of-instances = 3 -akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] +akka.actor.deployment./app/service-hello.router = "round-robin" +akka.actor.deployment./app/service-hello.nr-of-instances = 3 +akka.actor.deployment./app/service-hello.remote.nodes = ["localhost:9991","localhost:9992","localhost:9993"] diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index c6827a3547..a97e5591d2 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -18,8 +18,8 @@ import akka.AkkaApplication * @author Roland Kuhn * @since 1.1 */ -class TestActorRef[T <: Actor](_app: AkkaApplication, _props: Props, _supervisor: ActorRef, address: String) - extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_app)), _supervisor, address, false) { +class TestActorRef[T <: Actor](_app: AkkaApplication, _props: Props, _supervisor: ActorRef, name: String) + extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_app)), _supervisor, _supervisor.path / name, false) { /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use @@ -34,27 +34,32 @@ class TestActorRef[T <: Actor](_app: AkkaApplication, _props: Props, _supervisor */ def underlyingActor: T = underlyingActorInstance.asInstanceOf[T] - override def toString = "TestActor[" + address + ":" + uuid + "]" + override def toString = "TestActor[" + address + "]" - override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].uuid == uuid + override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].address == address } object TestActorRef { - def apply[T <: Actor](factory: ⇒ T)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), Props.randomAddress) + def apply[T <: Actor](factory: ⇒ T)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), Props.randomName) - def apply[T <: Actor](factory: ⇒ T, address: String)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), address) + def apply[T <: Actor](factory: ⇒ T, name: String)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), name) - def apply[T <: Actor](props: Props)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, Props.randomAddress) + def apply[T <: Actor](props: Props)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, Props.randomName) - def apply[T <: Actor](props: Props, address: String)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, app.guardian, address) + def apply[T <: Actor](props: Props, name: String)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, app.guardian, name) - def apply[T <: Actor](props: Props, supervisor: ActorRef, address: String)(implicit app: AkkaApplication): TestActorRef[T] = - new TestActorRef(app, props, supervisor, address) + def apply[T <: Actor](props: Props, supervisor: ActorRef, givenName: String)(implicit app: AkkaApplication): TestActorRef[T] = { + val name: String = givenName match { + case null | Props.randomName ⇒ newUuid.toString + case given ⇒ given + } + new TestActorRef(app, props, supervisor, name) + } - def apply[T <: Actor](implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props.randomAddress) + def apply[T <: Actor](implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props.randomName) - def apply[T <: Actor](address: String)(implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props({ + def apply[T <: Actor](name: String)(implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props({ import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[T](m.erasure, noParams, noArgs) match { case Right(value) ⇒ value @@ -64,5 +69,5 @@ object TestActorRef { "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", exception) } - }), address) + }), name) } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 6ea2e058c6..772f390a68 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -34,8 +34,8 @@ import akka.AkkaApplication * @author Roland Kuhn * @since 1.2 */ -class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, supervisor: ActorRef, address: String)(implicit ev: T <:< FSM[S, D]) - extends TestActorRef(app, props, supervisor, address) { +class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D]) + extends TestActorRef(app, props, supervisor, name) { private def fsm: T = underlyingActor @@ -81,8 +81,8 @@ class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, superviso object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, Props.randomAddress) + new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, Props.randomName) - def apply[S, D, T <: Actor](factory: ⇒ T, address: String)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, address) + def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] = + new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, name) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index a44905a3b5..19376ea313 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -64,7 +64,7 @@ akka { deployment { - service-ping { # deployment id pattern + /app/service-ping { # deployment id pattern router = "round-robin" # routing (load-balance) scheme to use # available: "direct", "round-robin", "random", "scatter-gather"