that was one hell of a FIXME

- fixed so that netty pipeline when generating addresses does not need
  to know the system name of the connecting client (which might differ
  from the local one, of course)
- this entailed differentiating between transport addresses and system
  addresses, which I took as an opportunity to separate everything out
  properly so that address schemas can easily be made pluggable
- made RemoteSupport generic in the address format it supports
- adapt netty stuff, and made everything else work with the most
  generic: ParsedTransportAddress
- did I mention that I statically separated unparsed from parsed
  addresses?
This commit is contained in:
Roland 2011-12-11 20:00:26 +01:00
parent 40654227b7
commit 7f0275bca2
18 changed files with 357 additions and 231 deletions

View file

@ -17,7 +17,7 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import com.eaio.uuid.UUID
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension }
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher, MessageDispatcher }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.dispatch.SystemMessage
@ -28,55 +28,89 @@ import scala.annotation.tailrec
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettings: RemoteSettings) {
class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSettings) {
val log = Logging(system, "Remote")
import system._
import settings._
val serialization = SerializationExtension(system)
val remoteAddress = RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)
// TODO make this really pluggable
val transports: TransportsMap = Map("akka" -> ((h, p) Right(RemoteNettyAddress(h, p))))
val remoteAddress: RemoteSystemAddress[ParsedTransportAddress] = {
val unparsedAddress = remoteSettings.serverSettings.URI match {
case RemoteAddressExtractor(a) a
case x throw new IllegalArgumentException("cannot parse URI " + x)
}
val parsed = unparsedAddress.parse(transports) match {
case Left(x) throw new IllegalArgumentException(x.transport.error)
case Right(x) x
}
parsed.copy(system = settings.name)
}
val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize)
val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
@volatile
private var _serialization: Serialization = _
def serialization = _serialization
val remoteDaemon = new RemoteSystemDaemon(this, provider.rootPath / "remote", provider.rootGuardian, log)
@volatile
private var _computeGridDispatcher: MessageDispatcher = _
def computeGridDispatcher = _computeGridDispatcher
val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor {
def receive = {
case RemoteClientError(cause, remote, address) remote.shutdownClientConnection(address)
case RemoteClientDisconnected(remote, address) remote.shutdownClientConnection(address)
case _ //ignore other
@volatile
private var _remoteDaemon: InternalActorRef = _
def remoteDaemon = _remoteDaemon
@volatile
private var _eventStream: NetworkEventStream = _
def eventStream = _eventStream
@volatile
private var _server: RemoteSupport[ParsedTransportAddress] = _
def server = _server
def init(system: ActorSystemImpl) = {
val log = Logging(system, "Remote")
_serialization = SerializationExtension(system)
_computeGridDispatcher = system.dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
_remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log)
_eventStream = new NetworkEventStream(system)
_server = {
val arguments = Seq(
classOf[ActorSystemImpl] -> system,
classOf[Remote] -> this,
classOf[RemoteSystemAddress[_ <: ParsedTransportAddress]] -> remoteAddress)
val types: Array[Class[_]] = arguments map (_._1) toArray
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[RemoteSupport[ParsedTransportAddress]](remoteSettings.RemoteTransport, types, values) match {
case Left(problem)
log.error(problem, "Could not load remote transport layer")
throw problem
case Right(remote)
remote.start(None) //TODO Any application loader here?
val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor {
def receive = {
case RemoteClientError(cause, remote, address) remote.shutdownClientConnection(address)
case RemoteClientDisconnected(remote, address) remote.shutdownClientConnection(address)
case _ //ignore other
}
}), "RemoteClientLifeCycleListener")
system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
remote
}
}
}), "akka.remote.RemoteClientLifeCycleListener")
val eventStream = new NetworkEventStream(system)
val server: RemoteSupport = {
val arguments = Seq(
classOf[ActorSystem] -> system,
classOf[Remote] -> this)
val types: Array[Class[_]] = arguments map (_._1) toArray
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[RemoteSupport](remoteSettings.RemoteTransport, types, values) match {
case Left(problem)
log.error(problem, "Could not load remote transport layer")
throw problem
case Right(remote)
remote.start(None) //TODO Any application loader here?
system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
remote
}
log.info("Starting remote server on [{}]", remoteAddress)
}
log.info("Starting remote server on [{}]", remoteAddress)
}
/**
@ -86,7 +120,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
extends VirtualPathContainer(_path, _parent, _log) {
/**
@ -116,7 +150,7 @@ class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActo
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case message: RemoteSystemDaemonMessageProtocol
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.nodename)
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.remoteSettings.NodeName)
message.getMessageType match {
case USE handleUse(message)
@ -155,16 +189,17 @@ class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActo
}
import remote.remoteAddress
implicit val t = remote.transports
message.getActorPath match {
case RemoteActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote"
case ParsedActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote"
// TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1)
val path = remote.remoteDaemon.path / subpath
val supervisor = remote.system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef]
val actor = remote.system.provider.actorOf(remote.system, Props(creator = actorFactory), supervisor, path, true)
val supervisor = system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef]
val actor = system.provider.actorOf(system, Props(creator = actorFactory), supervisor, path, true)
addChild(subpath.mkString("/"), actor)
remote.system.deathWatch.subscribe(this, actor)
system.deathWatch.subscribe(this, actor)
case _
log.error("remote path does not match path from message [{}]", message)
}
@ -250,19 +285,17 @@ class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActo
}
}
class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLoader: Option[ClassLoader] = None) {
def provider = remote.system.asInstanceOf[ActorSystemImpl].provider
class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader] = None) {
def originalReceiver = input.getRecipient.getPath
lazy val sender: ActorRef =
if (input.hasSender) provider.actorFor(provider.rootGuardian, input.getSender.getPath)
else remote.system.deadLetters
if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath)
else system.deadLetters
lazy val recipient: InternalActorRef = provider.actorFor(provider.rootGuardian, originalReceiver)
lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver)
lazy val payload: AnyRef = MessageSerializer.deserialize(remote.system, input.getMessage, classLoader)
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, classLoader)
override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender
}
@ -335,8 +368,9 @@ trait RemoteMarshallingOps {
case m l.!(m)(remoteMessage.sender)
}
case r: RemoteActorRef
implicit val t = remote.transports
remoteMessage.originalReceiver match {
case RemoteActorPath(address, _) if address == remote.remoteDaemon.path.address
case ParsedActorPath(address, _) if address == remote.remoteDaemon.path.address
r.!(remoteMessage.payload)(remoteMessage.sender)
case r log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r)
}