New Remoting implementation (iteration 3) #2053

- Asynchronous lockless remoting
 - Pluggable transport drivers
 - Multiple transport support
 - Simplified lifecycle events
 - Support for netty based TCP, SSL+TCP and UDP support
 - Updated Akka protocol with soft-state connections and failure detection
 - Refactored failure detectors (currently duplicated from cluster)
This commit is contained in:
Endre Sándor Varga 2012-09-12 11:18:42 +02:00
parent 3ee7dbcc45
commit 5f9253b79e
39 changed files with 4682 additions and 132 deletions

View file

@ -77,7 +77,7 @@ class RemoteActorRefProvider(
}).get
}
_log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")")
_log = Logging(eventStream, "RemoteActorRefProvider")
// this enables reception of remote requests
_transport.start()
@ -108,15 +108,15 @@ class RemoteActorRefProvider(
* address below remote, including the current systems identification
* as sys@host:port (typically; it will use whatever the remote
* transport uses). This means that on a path up an actor tree each node
* change introduces one layer or remote/sys@host:port/ within the URI.
* change introduces one layer or remote/scheme/sys@host:port/ within the URI.
*
* Example:
*
* akka://sys@home:1234/remote/sys@remote:6667/remote/sys@other:3333/user/a/b/c
* akka://sys@home:1234/remote/akka/sys@remote:6667/remote/akka/sys@other:3333/user/a/b/c
*
* means that the logical parent originates from sys@other:3333 with
* one child (may be a or b) being deployed on sys@remote:6667 and
* finally either b or c being created on sys@home:1234, where
* means that the logical parent originates from akka://sys@other:3333 with
* one child (may be a or b) being deployed on akka://sys@remote:6667 and
* finally either b or c being created on akka://sys@home:1234, where
* this whole thing actually resides. Thus, the logical path is
* /user/a/b/c and the physical path contains all remote placement
* information.
@ -129,7 +129,7 @@ class RemoteActorRefProvider(
def lookupRemotes(p: Iterable[String]): Option[Deploy] = {
p.headOption match {
case None None
case Some("remote") lookupRemotes(p.drop(2))
case Some("remote") lookupRemotes(p.drop(3))
case Some("user") deployer.lookup(p.drop(1))
case Some(_) None
}
@ -154,11 +154,13 @@ class RemoteActorRefProvider(
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) b withFallback a) match {
case d @ Deploy(_, _, _, RemoteScope(addr))
if (addr == rootPath.address || addr == transport.address) {
if (addr == rootPath.address || transport.addresses(addr)) {
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
} else {
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
new RemoteActorRef(this, transport, rpath, supervisor, Some(props), Some(d))
val localAddress = transport.localAddressForRemote(addr)
val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements
useActorOnNode(rpath, props, d, supervisor)
new RemoteActorRef(this, transport, localAddress, rpath, supervisor, Some(props), Some(d))
}
case _ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
@ -167,13 +169,13 @@ class RemoteActorRefProvider(
}
def actorFor(path: ActorPath): InternalActorRef =
if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements)
else new RemoteActorRef(this, transport, path, Nobody, props = None, deploy = None)
if (path.address == rootPath.address || transport.addresses(path.address)) actorFor(rootGuardian, path.elements)
else new RemoteActorRef(this, transport, transport.localAddressForRemote(path.address), path, Nobody, props = None, deploy = None)
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case ActorPathExtractor(address, elems)
if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems)
else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
if (address == rootPath.address || transport.addresses(address)) actorFor(rootGuardian, elems)
else new RemoteActorRef(this, transport, transport.localAddressForRemote(address), new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
case _ local.actorFor(ref, path)
}
@ -190,12 +192,11 @@ class RemoteActorRefProvider(
}
def getExternalAddressFor(addr: Address): Option[Address] = {
val ta = transport.address
val ra = rootPath.address
addr match {
case `ta` | `ra` Some(rootPath.address)
case Address("akka", _, Some(_), Some(_)) Some(transport.address)
case _ None
case a if (a eq ra) || transport.addresses(a) Some(rootPath.address)
case Address(_, _, Some(_), Some(_)) Some(transport.localAddressForRemote(addr))
case _ None
}
}
}
@ -211,6 +212,7 @@ private[akka] trait RemoteRef extends ActorRefScope {
private[akka] class RemoteActorRef private[akka] (
val provider: RemoteActorRefProvider,
remote: RemoteTransport,
val localAddressToUse: Address,
val path: ActorPath,
val getParent: InternalActorRef,
props: Option[Props],
@ -222,7 +224,7 @@ private[akka] class RemoteActorRef private[akka] (
s.headOption match {
case None this
case Some("..") getParent getChild name
case _ new RemoteActorRef(provider, remote, path / s, Nobody, props = None, deploy = None)
case _ new RemoteActorRef(provider, remote, localAddressToUse, path / s, Nobody, props = None, deploy = None)
}
}
@ -256,4 +258,4 @@ private[akka] class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path)
}
}