first step: sanitize ActorPath interface
- remove references to ActorSystem - make ChildActorPath light-weight (name & parent plus tailrec methods for traversing towards root, e.g. toString) - string rep is full URI always (akka://system-name@host:port/user/bla)
This commit is contained in:
parent
96598704c8
commit
dad1c98c48
11 changed files with 93 additions and 57 deletions
|
|
@ -284,8 +284,8 @@ class ActorRefSpec extends AkkaSpec {
|
|||
val baos = new ByteArrayOutputStream(8192 * 32)
|
||||
val out = new ObjectOutputStream(baos)
|
||||
|
||||
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
|
||||
val serialized = SerializedActorRef(addr.hostname, addr.port, "/this/path/does/not/exist")
|
||||
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address
|
||||
val serialized = SerializedActorRef(addr.hostPort, 0, "/this/path/does/not/exist")
|
||||
|
||||
out.writeObject(serialized)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import akka.remote.RemoteAddress
|
||||
import scala.annotation.tailrec
|
||||
|
||||
object ActorPath {
|
||||
final val separator = "/"
|
||||
|
|
@ -60,9 +58,10 @@ object ActorPath {
|
|||
*/
|
||||
trait ActorPath {
|
||||
/**
|
||||
* The RemoteAddress for this path.
|
||||
* The Address under which this path can be reached; walks up the tree to
|
||||
* the RootActorPath.
|
||||
*/
|
||||
def remoteAddress: RemoteAddress
|
||||
def address: Address = root.address
|
||||
|
||||
/**
|
||||
* The name of the actor that this path refers to.
|
||||
|
|
@ -84,49 +83,64 @@ trait ActorPath {
|
|||
*/
|
||||
def /(child: Iterable[String]): ActorPath = (this /: child)(_ / _)
|
||||
|
||||
/**
|
||||
* String representation of this path. Different from toString for root path.
|
||||
*/
|
||||
def string: String
|
||||
|
||||
/**
|
||||
* Sequence of names for this path.
|
||||
*/
|
||||
def path: Iterable[String]
|
||||
def pathElements: Iterable[String]
|
||||
|
||||
/**
|
||||
* Is this the root path?
|
||||
* Walk up the tree to obtain and return the RootActorPath.
|
||||
*/
|
||||
def isRoot: Boolean
|
||||
def root: RootActorPath
|
||||
}
|
||||
|
||||
class RootActorPath(val remoteAddress: RemoteAddress) extends ActorPath {
|
||||
|
||||
def name: String = "/"
|
||||
/**
|
||||
* Root of the hierarchy of ActorPaths. There is exactly root per ActorSystem
|
||||
* and node (for remote-enabled or clustered systems).
|
||||
*/
|
||||
class RootActorPath(override val address: Address, val name: String = ActorPath.separator) extends ActorPath {
|
||||
|
||||
def parent: ActorPath = this
|
||||
|
||||
def /(child: String): ActorPath = new ChildActorPath(remoteAddress, this, child)
|
||||
def root: RootActorPath = this
|
||||
|
||||
def string: String = ""
|
||||
def /(child: String): ActorPath = new ChildActorPath(this, child)
|
||||
|
||||
def path: Iterable[String] = Iterable.empty
|
||||
def pathElements: Iterable[String] = Iterable.empty
|
||||
|
||||
def isRoot: Boolean = true
|
||||
|
||||
override def toString = ActorPath.separator
|
||||
override val toString = address + ActorPath.separator
|
||||
}
|
||||
|
||||
class ChildActorPath(val remoteAddress: RemoteAddress, val parent: ActorPath, val name: String) extends ActorPath {
|
||||
class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath {
|
||||
|
||||
def /(child: String): ActorPath = new ChildActorPath(remoteAddress, this, child)
|
||||
def /(child: String): ActorPath = new ChildActorPath(this, child)
|
||||
|
||||
def string: String = parent.string + ActorPath.separator + name
|
||||
def pathElements: Iterable[String] = {
|
||||
@tailrec
|
||||
def rec(p: ActorPath, acc: List[String]): Iterable[String] = p match {
|
||||
case r: RootActorPath ⇒ acc
|
||||
case _ ⇒ rec(p.parent, p.name :: acc)
|
||||
}
|
||||
rec(this, Nil)
|
||||
}
|
||||
|
||||
def path: Iterable[String] = parent.path ++ Iterable(name)
|
||||
def root = {
|
||||
@tailrec
|
||||
def rec(p: ActorPath): RootActorPath = p match {
|
||||
case r: RootActorPath ⇒ r
|
||||
case _ ⇒ rec(p.parent)
|
||||
}
|
||||
rec(this)
|
||||
}
|
||||
|
||||
def isRoot: Boolean = false
|
||||
|
||||
override def toString = string
|
||||
override def toString = {
|
||||
@tailrec
|
||||
def rec(p: ActorPath, s: String): String = p match {
|
||||
case r: RootActorPath ⇒ r + s
|
||||
case _ if s.isEmpty ⇒ rec(p.parent, name)
|
||||
case _ ⇒ rec(p.parent, p.name + ActorPath.separator + s)
|
||||
}
|
||||
rec(this, "")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -303,10 +303,12 @@ trait ScalaActorRef { ref: ActorRef ⇒
|
|||
/**
|
||||
* Memento pattern for serializing ActorRefs transparently
|
||||
*/
|
||||
|
||||
// FIXME: remove and replace by ActorPath.toString
|
||||
case class SerializedActorRef(hostname: String, port: Int, path: String) {
|
||||
import akka.serialization.Serialization.system
|
||||
|
||||
// FIXME this is broken, but see above
|
||||
def this(address: Address, path: String) = this(address.hostPort, 0, path)
|
||||
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path)
|
||||
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
|
||||
|
||||
|
|
|
|||
|
|
@ -130,7 +130,7 @@ trait ActorRefFactory {
|
|||
|
||||
def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() ⇒ creator.create()))
|
||||
|
||||
def actorFor(path: ActorPath): Option[ActorRef] = actorFor(path.path)
|
||||
def actorFor(path: ActorPath): Option[ActorRef] = actorFor(path.pathElements)
|
||||
|
||||
def actorFor(path: String): Option[ActorRef] = actorFor(ActorPath.split(path))
|
||||
|
||||
|
|
@ -357,7 +357,7 @@ class LocalActorRefProvider(
|
|||
}
|
||||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(ActorPath.split(actor.path))
|
||||
private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.remoteAddress, actor.path.toString)
|
||||
private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.address, actor.path.toString)
|
||||
|
||||
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
|
||||
|
||||
|
|
|
|||
21
akka-actor/src/main/scala/akka/actor/Address.scala
Normal file
21
akka-actor/src/main/scala/akka/actor/Address.scala
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
/**
|
||||
* The address specifies the physical location under which an Actor can be
|
||||
* reached. Examples are local addresses, identified by the ActorSystem’s
|
||||
* name, and remote addresses, identified by protocol, host and port.
|
||||
*/
|
||||
abstract class Address {
|
||||
def protocol: String
|
||||
def hostPort: String
|
||||
@transient
|
||||
override lazy val toString = protocol + "://" + hostPort
|
||||
}
|
||||
|
||||
case class LocalAddress(systemName: String) extends Address {
|
||||
def protocol = "akka"
|
||||
def hostPort = systemName
|
||||
}
|
||||
|
|
@ -28,10 +28,10 @@ object RemoteAddress {
|
|||
|
||||
object LocalOnly extends RemoteAddress(0, "local")
|
||||
|
||||
case class RemoteAddress private[akka] (port: Int, hostname: String) {
|
||||
case class RemoteAddress private[akka] (port: Int, hostname: String) extends Address {
|
||||
def protocol = "akka"
|
||||
@transient
|
||||
override lazy val toString = "" + hostname + ":" + port
|
||||
|
||||
lazy val hostPort = hostname + ":" + port
|
||||
}
|
||||
|
||||
class RemoteException(message: String) extends AkkaException(message)
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa
|
|||
|
||||
def system = owner.system
|
||||
def ownerPath = owner.self.path
|
||||
val ownerPathString = ownerPath.path.mkString("/")
|
||||
val ownerPathString = ownerPath.pathElements.mkString("/")
|
||||
val name = "mailbox_" + Name.replaceAllIn(ownerPathString, "_")
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class Gossiper(remote: Remote) {
|
|||
private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef])
|
||||
private val seeds = Set(address) // FIXME read in list of seeds from config
|
||||
|
||||
private val address = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
|
||||
private val address = remote.remoteAddress
|
||||
private val nodeFingerprint = address.##
|
||||
|
||||
private val random = SecureRandom.getInstance("SHA1PRNG")
|
||||
|
|
|
|||
|
|
@ -71,22 +71,21 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
|
|||
|
||||
lazy val eventStream = new NetworkEventStream(system)
|
||||
|
||||
lazy val server: RemoteSupport = {
|
||||
val remote = new akka.remote.netty.NettyRemoteSupport(system)
|
||||
@volatile
|
||||
private var _server: RemoteSupport = _
|
||||
def server = _server
|
||||
|
||||
def start(): Unit = {
|
||||
val remote = new akka.remote.netty.NettyRemoteSupport(system, this)
|
||||
remote.start() //TODO FIXME Any application loader here?
|
||||
|
||||
system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
|
||||
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
|
||||
|
||||
// TODO actually register this provider in system in remote mode
|
||||
//provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider)
|
||||
remote
|
||||
}
|
||||
_server = remote
|
||||
|
||||
def start(): Unit = {
|
||||
val serverAddress = server.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress //Force init of server
|
||||
val daemonAddress = remoteDaemon.address //Force init of daemon
|
||||
log.info("Starting remote server on [{}] and starting remoteDaemon with address [{}]", serverAddress, daemonAddress)
|
||||
log.info("Starting remote server on [{}] and starting remoteDaemon with address [{}]", remoteAddress, daemonAddress)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class RemoteActorRefProvider(
|
|||
// case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass)
|
||||
// }
|
||||
|
||||
def isReplicaNode: Boolean = remoteAddresses exists { _ == rootPath.remoteAddress }
|
||||
def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
|
||||
|
||||
//system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
|
||||
|
|
@ -204,10 +204,10 @@ class RemoteActorRefProvider(
|
|||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
|
||||
val remoteAddress = RemoteAddress(actor.hostname, actor.port)
|
||||
if (optimizeLocalScoped_? && remoteAddress == rootPath.remoteAddress) {
|
||||
if (optimizeLocalScoped_? && remoteAddress == remote.remoteAddress) {
|
||||
local.actorFor(ActorPath.split(actor.path))
|
||||
} else {
|
||||
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", rootPath.remoteAddress, actor.path, remoteAddress)
|
||||
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", remote.remoteAddress, actor.path, remoteAddress)
|
||||
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,8 +95,8 @@ abstract class RemoteClient private[akka] (
|
|||
}
|
||||
|
||||
class PassiveRemoteClient(val currentChannel: Channel,
|
||||
remoteSupport: NettyRemoteSupport,
|
||||
remoteAddress: RemoteAddress)
|
||||
remoteSupport: NettyRemoteSupport,
|
||||
remoteAddress: RemoteAddress)
|
||||
extends RemoteClient(remoteSupport, remoteAddress) {
|
||||
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn {
|
||||
|
|
@ -141,7 +141,7 @@ class ActiveRemoteClient private[akka] (
|
|||
|
||||
def currentChannel = connection.getChannel
|
||||
|
||||
private val senderRemoteAddress = remoteSupport.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
|
||||
private val senderRemoteAddress = remoteSupport.remote.remoteAddress
|
||||
|
||||
/**
|
||||
* Connect to remote server.
|
||||
|
|
@ -355,7 +355,7 @@ class ActiveRemoteClientHandler(
|
|||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
*/
|
||||
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
|
||||
class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends RemoteSupport(_system) with RemoteMarshallingOps {
|
||||
val log = Logging(system, "NettyRemoteSupport")
|
||||
|
||||
val serverSettings = RemoteExtension(system).settings.serverSettings
|
||||
|
|
@ -458,7 +458,7 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
|
|||
|
||||
def name = currentServer.get match {
|
||||
case Some(server) ⇒ server.name
|
||||
case None ⇒ "Non-running NettyRemoteServer@" + system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
|
||||
case None ⇒ "Non-running NettyRemoteServer@" + remote.remoteAddress
|
||||
}
|
||||
|
||||
private val _isRunning = new Switch(false)
|
||||
|
|
@ -493,7 +493,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
|
|||
val log = Logging(remoteSupport.system, "NettyRemoteServer")
|
||||
import remoteSupport.serverSettings._
|
||||
|
||||
val address = remoteSupport.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
|
||||
val address = remoteSupport.remote.remoteAddress
|
||||
|
||||
val name = "NettyRemoteServer@" + address
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue