diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 1358e61c82..938204b993 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -284,8 +284,8 @@ class ActorRefSpec extends AkkaSpec { val baos = new ByteArrayOutputStream(8192 * 32) val out = new ObjectOutputStream(baos) - val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress - val serialized = SerializedActorRef(addr.hostname, addr.port, "/this/path/does/not/exist") + val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address + val serialized = SerializedActorRef(addr.hostPort, 0, "/this/path/does/not/exist") out.writeObject(serialized) diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index a741705e4f..11e735218a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -1,10 +1,8 @@ /** * Copyright (C) 2009-2011 Typesafe Inc. */ - package akka.actor - -import akka.remote.RemoteAddress +import scala.annotation.tailrec object ActorPath { final val separator = "/" @@ -60,9 +58,10 @@ object ActorPath { */ trait ActorPath { /** - * The RemoteAddress for this path. + * The Address under which this path can be reached; walks up the tree to + * the RootActorPath. */ - def remoteAddress: RemoteAddress + def address: Address = root.address /** * The name of the actor that this path refers to. @@ -84,49 +83,64 @@ trait ActorPath { */ def /(child: Iterable[String]): ActorPath = (this /: child)(_ / _) - /** - * String representation of this path. Different from toString for root path. - */ - def string: String - /** * Sequence of names for this path. */ - def path: Iterable[String] + def pathElements: Iterable[String] /** - * Is this the root path? + * Walk up the tree to obtain and return the RootActorPath. */ - def isRoot: Boolean + def root: RootActorPath } -class RootActorPath(val remoteAddress: RemoteAddress) extends ActorPath { - - def name: String = "/" +/** + * Root of the hierarchy of ActorPaths. There is exactly root per ActorSystem + * and node (for remote-enabled or clustered systems). + */ +class RootActorPath(override val address: Address, val name: String = ActorPath.separator) extends ActorPath { def parent: ActorPath = this - def /(child: String): ActorPath = new ChildActorPath(remoteAddress, this, child) + def root: RootActorPath = this - def string: String = "" + def /(child: String): ActorPath = new ChildActorPath(this, child) - def path: Iterable[String] = Iterable.empty + def pathElements: Iterable[String] = Iterable.empty - def isRoot: Boolean = true - - override def toString = ActorPath.separator + override val toString = address + ActorPath.separator } -class ChildActorPath(val remoteAddress: RemoteAddress, val parent: ActorPath, val name: String) extends ActorPath { +class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath { - def /(child: String): ActorPath = new ChildActorPath(remoteAddress, this, child) + def /(child: String): ActorPath = new ChildActorPath(this, child) - def string: String = parent.string + ActorPath.separator + name + def pathElements: Iterable[String] = { + @tailrec + def rec(p: ActorPath, acc: List[String]): Iterable[String] = p match { + case r: RootActorPath ⇒ acc + case _ ⇒ rec(p.parent, p.name :: acc) + } + rec(this, Nil) + } - def path: Iterable[String] = parent.path ++ Iterable(name) + def root = { + @tailrec + def rec(p: ActorPath): RootActorPath = p match { + case r: RootActorPath ⇒ r + case _ ⇒ rec(p.parent) + } + rec(this) + } - def isRoot: Boolean = false - - override def toString = string + override def toString = { + @tailrec + def rec(p: ActorPath, s: String): String = p match { + case r: RootActorPath ⇒ r + s + case _ if s.isEmpty ⇒ rec(p.parent, name) + case _ ⇒ rec(p.parent, p.name + ActorPath.separator + s) + } + rec(this, "") + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 35d36e3667..101c69a81b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -303,10 +303,12 @@ trait ScalaActorRef { ref: ActorRef ⇒ /** * Memento pattern for serializing ActorRefs transparently */ - +// FIXME: remove and replace by ActorPath.toString case class SerializedActorRef(hostname: String, port: Int, path: String) { import akka.serialization.Serialization.system + // FIXME this is broken, but see above + def this(address: Address, path: String) = this(address.hostPort, 0, path) def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path) def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 28fc383adf..4ebf29a835 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -130,7 +130,7 @@ trait ActorRefFactory { def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() ⇒ creator.create())) - def actorFor(path: ActorPath): Option[ActorRef] = actorFor(path.path) + def actorFor(path: ActorPath): Option[ActorRef] = actorFor(path.pathElements) def actorFor(path: String): Option[ActorRef] = actorFor(ActorPath.split(path)) @@ -357,7 +357,7 @@ class LocalActorRefProvider( } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(ActorPath.split(actor.path)) - private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.remoteAddress, actor.path.toString) + private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.address, actor.path.toString) private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala new file mode 100644 index 0000000000..c405702f6e --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +/** + * The address specifies the physical location under which an Actor can be + * reached. Examples are local addresses, identified by the ActorSystem’s + * name, and remote addresses, identified by protocol, host and port. + */ +abstract class Address { + def protocol: String + def hostPort: String + @transient + override lazy val toString = protocol + "://" + hostPort +} + +case class LocalAddress(systemName: String) extends Address { + def protocol = "akka" + def hostPort = systemName +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 7d33508f46..4acf0e37c7 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -28,10 +28,10 @@ object RemoteAddress { object LocalOnly extends RemoteAddress(0, "local") -case class RemoteAddress private[akka] (port: Int, hostname: String) { +case class RemoteAddress private[akka] (port: Int, hostname: String) extends Address { + def protocol = "akka" @transient - override lazy val toString = "" + hostname + ":" + port - + lazy val hostPort = hostname + ":" + port } class RemoteException(message: String) extends AkkaException(message) diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index b5d832d443..6ace3d84aa 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -41,7 +41,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa def system = owner.system def ownerPath = owner.self.path - val ownerPathString = ownerPath.path.mkString("/") + val ownerPathString = ownerPath.pathElements.mkString("/") val name = "mailbox_" + Name.replaceAllIn(ownerPathString, "_") } diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 3735f6ceaf..755fc08df0 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -108,7 +108,7 @@ class Gossiper(remote: Remote) { private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) private val seeds = Set(address) // FIXME read in list of seeds from config - private val address = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress + private val address = remote.remoteAddress private val nodeFingerprint = address.## private val random = SecureRandom.getInstance("SHA1PRNG") diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 123304c314..ecdc5d39a1 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -71,22 +71,21 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { lazy val eventStream = new NetworkEventStream(system) - lazy val server: RemoteSupport = { - val remote = new akka.remote.netty.NettyRemoteSupport(system) + @volatile + private var _server: RemoteSupport = _ + def server = _server + + def start(): Unit = { + val remote = new akka.remote.netty.NettyRemoteSupport(system, this) remote.start() //TODO FIXME Any application loader here? system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - // TODO actually register this provider in system in remote mode - //provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) - remote - } + _server = remote - def start(): Unit = { - val serverAddress = server.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress //Force init of server val daemonAddress = remoteDaemon.address //Force init of daemon - log.info("Starting remote server on [{}] and starting remoteDaemon with address [{}]", serverAddress, daemonAddress) + log.info("Starting remote server on [{}] and starting remoteDaemon with address [{}]", remoteAddress, daemonAddress) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 0e676546e6..a34f975d57 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -102,7 +102,7 @@ class RemoteActorRefProvider( // case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) // } - def isReplicaNode: Boolean = remoteAddresses exists { _ == rootPath.remoteAddress } + def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress } //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode)) @@ -204,10 +204,10 @@ class RemoteActorRefProvider( private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { val remoteAddress = RemoteAddress(actor.hostname, actor.port) - if (optimizeLocalScoped_? && remoteAddress == rootPath.remoteAddress) { + if (optimizeLocalScoped_? && remoteAddress == remote.remoteAddress) { local.actorFor(ActorPath.split(actor.path)) } else { - log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", rootPath.remoteAddress, actor.path, remoteAddress) + log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", remote.remoteAddress, actor.path, remoteAddress) Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index eedf58fd9e..497976cccf 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -95,8 +95,8 @@ abstract class RemoteClient private[akka] ( } class PassiveRemoteClient(val currentChannel: Channel, - remoteSupport: NettyRemoteSupport, - remoteAddress: RemoteAddress) + remoteSupport: NettyRemoteSupport, + remoteAddress: RemoteAddress) extends RemoteClient(remoteSupport, remoteAddress) { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { @@ -141,7 +141,7 @@ class ActiveRemoteClient private[akka] ( def currentChannel = connection.getChannel - private val senderRemoteAddress = remoteSupport.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress + private val senderRemoteAddress = remoteSupport.remote.remoteAddress /** * Connect to remote server. @@ -355,7 +355,7 @@ class ActiveRemoteClientHandler( /** * Provides the implementation of the Netty remote support */ -class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { +class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends RemoteSupport(_system) with RemoteMarshallingOps { val log = Logging(system, "NettyRemoteSupport") val serverSettings = RemoteExtension(system).settings.serverSettings @@ -458,7 +458,7 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi def name = currentServer.get match { case Some(server) ⇒ server.name - case None ⇒ "Non-running NettyRemoteServer@" + system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress + case None ⇒ "Non-running NettyRemoteServer@" + remote.remoteAddress } private val _isRunning = new Switch(false) @@ -493,7 +493,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio val log = Logging(remoteSupport.system, "NettyRemoteServer") import remoteSupport.serverSettings._ - val address = remoteSupport.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress + val address = remoteSupport.remote.remoteAddress val name = "NettyRemoteServer@" + address