diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index f2ebdb0cc0..23043c5303 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -35,9 +35,9 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 */ private case class State( version: Long = 0L, - failureStats: Map[RemoteAddress, FailureStats] = Map.empty[RemoteAddress, FailureStats], - intervalHistory: Map[RemoteAddress, Vector[Long]] = Map.empty[RemoteAddress, Vector[Long]], - timestamps: Map[RemoteAddress, Long] = Map.empty[RemoteAddress, Long]) + failureStats: Map[ParsedTransportAddress, FailureStats] = Map.empty[ParsedTransportAddress, FailureStats], + intervalHistory: Map[ParsedTransportAddress, Vector[Long]] = Map.empty[ParsedTransportAddress, Vector[Long]], + timestamps: Map[ParsedTransportAddress, Long] = Map.empty[ParsedTransportAddress, Long]) private val state = new AtomicReference[State](State()) @@ -45,13 +45,13 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Returns true if the connection is considered to be up and healthy * and returns false otherwise. */ - def isAvailable(connection: RemoteAddress): Boolean = phi(connection) < threshold + def isAvailable(connection: ParsedTransportAddress): Boolean = phi(connection) < threshold /** * Records a heartbeat for a connection. */ @tailrec - final def heartbeat(connection: RemoteAddress) { + final def heartbeat(connection: ParsedTransportAddress) { val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -132,7 +132,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Implementations of 'Cumulative Distribution Function' for Exponential Distribution. * For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597]. */ - def phi(connection: RemoteAddress): Double = { + def phi(connection: ParsedTransportAddress): Double = { val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections @@ -147,7 +147,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Removes the heartbeat management for a connection. */ @tailrec - final def remove(connection: RemoteAddress) { + final def remove(connection: ParsedTransportAddress) { val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 7b6fd8a660..b1093bb943 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -28,8 +28,8 @@ import com.google.protobuf.ByteString * Interface for node membership change listener. */ trait NodeMembershipChangeListener { - def nodeConnected(node: RemoteAddress) - def nodeDisconnected(node: RemoteAddress) + def nodeConnected(node: ParsedTransportAddress) + def nodeDisconnected(node: ParsedTransportAddress) } /** @@ -37,22 +37,22 @@ trait NodeMembershipChangeListener { */ case class Gossip( version: VectorClock, - node: RemoteAddress, - availableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress], - unavailableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress]) + node: ParsedTransportAddress, + availableNodes: Set[ParsedTransportAddress] = Set.empty[ParsedTransportAddress], + unavailableNodes: Set[ParsedTransportAddress] = Set.empty[ParsedTransportAddress]) // ====== START - NEW GOSSIP IMPLEMENTATION ====== /* case class Gossip( version: VectorClock, - node: RemoteAddress, - leader: RemoteAddress, // FIXME leader is always head of 'members', so we probably don't need this field + node: ParsedTransportAddress, + leader: ParsedTransportAddress, // FIXME leader is always head of 'members', so we probably don't need this field members: SortedSet[Member] = SortetSet.empty[Member](Ordering.fromLessThan[String](_ > _)), // sorted set of members with their status, sorted by name seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock], // for ring convergence pendingChanges: Option[Vector[PendingPartitioningChange]] = None, // for handoff meta: Option[Map[String, Array[Byte]]] = None) // misc meta-data - case class Member(address: RemoteAddress, status: MemberStatus) + case class Member(address: ParsedTransportAddress, status: MemberStatus) sealed trait MemberStatus object MemberStatus { @@ -73,8 +73,8 @@ case class Gossip( type VNodeMod = AnyRef case class PendingPartitioningChange( - owner: RemoteAddress, - nextOwner: RemoteAddress, + owner: ParsedTransportAddress, + nextOwner: ParsedTransportAddress, changes: Vector[VNodeMod], status: PendingPartitioningStatus) */ @@ -95,7 +95,7 @@ case class Gossip( * gossip to random seed with certain probability depending on number of unreachable, seed and live nodes. * */ -class Gossiper(remote: Remote) { +class Gossiper(remote: Remote, system: ActorSystemImpl) { /** * Represents the state for this Gossiper. Implemented using optimistic lockless concurrency, @@ -105,15 +105,21 @@ class Gossiper(remote: Remote) { currentGossip: Gossip, nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener]) - private val system = remote.system private val remoteSettings = remote.remoteSettings - private val serialization = SerializationExtension(system) + private val serialization = remote.serialization private val log = Logging(system, "Gossiper") private val failureDetector = remote.failureDetector - private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) + private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[ParsedTransportAddress, ActorRef]) private val seeds = { - val seeds = remoteSettings.SeedNodes + val seeds = remoteSettings.SeedNodes flatMap { + case x: UnparsedTransportAddress ⇒ + x.parse(remote.transports) match { + case y: ParsedTransportAddress ⇒ Some(y) + case _ ⇒ None + } + case _ ⇒ None + } if (seeds.isEmpty) throw new ConfigurationException( "At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]") else seeds @@ -161,7 +167,7 @@ class Gossiper(remote: Remote) { node ← oldAvailableNodes if connectionManager.connectionFor(node).isEmpty } { - val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, Nobody, None) + val connectionFactory = () ⇒ system.actorFor(RootActorPath(RemoteSystemAddress(system.name, gossipingNode)) / "remote") connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes } @@ -235,7 +241,7 @@ class Gossiper(remote: Remote) { /** * Gossips set of nodes passed in as argument. Returns 'true' if it gossiped to a "seed" node. */ - private def gossipTo(nodes: Set[RemoteAddress]): Boolean = { + private def gossipTo(nodes: Set[ParsedTransportAddress]): Boolean = { val peers = nodes filter (_ != address) // filter out myself val peer = selectRandomNode(peers) val oldState = state.get @@ -298,8 +304,8 @@ class Gossiper(remote: Remote) { private def newGossip(): Gossip = Gossip( version = VectorClock(), - node = address, - availableNodes = Set(address)) + node = address.transport, + availableNodes = Set(address.transport)) private def incrementVersionForGossip(from: Gossip): Gossip = { val newVersion = from.version.increment(nodeFingerprint, newTimestamp) @@ -327,7 +333,7 @@ class Gossiper(remote: Remote) { } } - private def selectRandomNode(nodes: Set[RemoteAddress]): RemoteAddress = { + private def selectRandomNode(nodes: Set[ParsedTransportAddress]): ParsedTransportAddress = { nodes.toList(random.nextInt(nodes.size)) } } diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 2cad35c948..94911e70fb 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -19,10 +19,10 @@ object NetworkEventStream { private sealed trait NetworkEventStreamEvent - private case class Register(listener: Listener, connectionAddress: RemoteAddress) + private case class Register(listener: Listener, connectionAddress: ParsedTransportAddress) extends NetworkEventStreamEvent - private case class Unregister(listener: Listener, connectionAddress: RemoteAddress) + private case class Unregister(listener: Listener, connectionAddress: ParsedTransportAddress) extends NetworkEventStreamEvent /** @@ -37,8 +37,8 @@ object NetworkEventStream { */ private class Channel extends Actor { - val listeners = new mutable.HashMap[RemoteAddress, mutable.Set[Listener]]() { - override def default(k: RemoteAddress) = mutable.Set.empty[Listener] + val listeners = new mutable.HashMap[ParsedTransportAddress, mutable.Set[Listener]]() { + override def default(k: ParsedTransportAddress) = mutable.Set.empty[Listener] } def receive = { @@ -70,12 +70,12 @@ class NetworkEventStream(system: ActorSystemImpl) { /** * Registers a network event stream listener (asyncronously). */ - def register(listener: Listener, connectionAddress: RemoteAddress) = + def register(listener: Listener, connectionAddress: ParsedTransportAddress) = sender ! Register(listener, connectionAddress) /** * Unregisters a network event stream listener (asyncronously) . */ - def unregister(listener: Listener, connectionAddress: RemoteAddress) = + def unregister(listener: Listener, connectionAddress: ParsedTransportAddress) = sender ! Unregister(listener, connectionAddress) } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 3ab628a559..b8e00702a8 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -17,7 +17,7 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import java.net.InetSocketAddress import com.eaio.uuid.UUID import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension } -import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher } +import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher, MessageDispatcher } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.TimeUnit.MILLISECONDS import akka.dispatch.SystemMessage @@ -28,55 +28,89 @@ import scala.annotation.tailrec * * @author Jonas Bonér */ -class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettings: RemoteSettings) { +class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSettings) { - val log = Logging(system, "Remote") - - import system._ import settings._ - val serialization = SerializationExtension(system) - - val remoteAddress = RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port) + // TODO make this really pluggable + val transports: TransportsMap = Map("akka" -> ((h, p) ⇒ Right(RemoteNettyAddress(h, p)))) + val remoteAddress: RemoteSystemAddress[ParsedTransportAddress] = { + val unparsedAddress = remoteSettings.serverSettings.URI match { + case RemoteAddressExtractor(a) ⇒ a + case x ⇒ throw new IllegalArgumentException("cannot parse URI " + x) + } + val parsed = unparsedAddress.parse(transports) match { + case Left(x) ⇒ throw new IllegalArgumentException(x.transport.error) + case Right(x) ⇒ x + } + parsed.copy(system = settings.name) + } val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize) - val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher") + @volatile + private var _serialization: Serialization = _ + def serialization = _serialization - val remoteDaemon = new RemoteSystemDaemon(this, provider.rootPath / "remote", provider.rootGuardian, log) + @volatile + private var _computeGridDispatcher: MessageDispatcher = _ + def computeGridDispatcher = _computeGridDispatcher - val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor { - def receive = { - case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) - case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address) - case _ ⇒ //ignore other + @volatile + private var _remoteDaemon: InternalActorRef = _ + def remoteDaemon = _remoteDaemon + + @volatile + private var _eventStream: NetworkEventStream = _ + def eventStream = _eventStream + + @volatile + private var _server: RemoteSupport[ParsedTransportAddress] = _ + def server = _server + + def init(system: ActorSystemImpl) = { + + val log = Logging(system, "Remote") + + _serialization = SerializationExtension(system) + _computeGridDispatcher = system.dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher") + _remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log) + _eventStream = new NetworkEventStream(system) + _server = { + val arguments = Seq( + classOf[ActorSystemImpl] -> system, + classOf[Remote] -> this, + classOf[RemoteSystemAddress[_ <: ParsedTransportAddress]] -> remoteAddress) + val types: Array[Class[_]] = arguments map (_._1) toArray + val values: Array[AnyRef] = arguments map (_._2) toArray + + ReflectiveAccess.createInstance[RemoteSupport[ParsedTransportAddress]](remoteSettings.RemoteTransport, types, values) match { + case Left(problem) ⇒ + + log.error(problem, "Could not load remote transport layer") + throw problem + + case Right(remote) ⇒ + + remote.start(None) //TODO Any application loader here? + + val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { + def receive = { + case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) + case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address) + case _ ⇒ //ignore other + } + }), "RemoteClientLifeCycleListener") + + system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) + system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) + + remote + } } - }), "akka.remote.RemoteClientLifeCycleListener") - val eventStream = new NetworkEventStream(system) - - val server: RemoteSupport = { - val arguments = Seq( - classOf[ActorSystem] -> system, - classOf[Remote] -> this) - val types: Array[Class[_]] = arguments map (_._1) toArray - val values: Array[AnyRef] = arguments map (_._2) toArray - - ReflectiveAccess.createInstance[RemoteSupport](remoteSettings.RemoteTransport, types, values) match { - case Left(problem) ⇒ - log.error(problem, "Could not load remote transport layer") - throw problem - case Right(remote) ⇒ - remote.start(None) //TODO Any application loader here? - - system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) - system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - - remote - } + log.info("Starting remote server on [{}]", remoteAddress) } - - log.info("Starting remote server on [{}]", remoteAddress) } /** @@ -86,7 +120,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin * * @author Jonas Bonér */ -class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) +class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(_path, _parent, _log) { /** @@ -116,7 +150,7 @@ class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActo override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { case message: RemoteSystemDaemonMessageProtocol ⇒ - log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.nodename) + log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.remoteSettings.NodeName) message.getMessageType match { case USE ⇒ handleUse(message) @@ -155,16 +189,17 @@ class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActo } import remote.remoteAddress + implicit val t = remote.transports message.getActorPath match { - case RemoteActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + case ParsedActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val path = remote.remoteDaemon.path / subpath - val supervisor = remote.system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef] - val actor = remote.system.provider.actorOf(remote.system, Props(creator = actorFactory), supervisor, path, true) + val supervisor = system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef] + val actor = system.provider.actorOf(system, Props(creator = actorFactory), supervisor, path, true) addChild(subpath.mkString("/"), actor) - remote.system.deathWatch.subscribe(this, actor) + system.deathWatch.subscribe(this, actor) case _ ⇒ log.error("remote path does not match path from message [{}]", message) } @@ -250,19 +285,17 @@ class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActo } } -class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLoader: Option[ClassLoader] = None) { - - def provider = remote.system.asInstanceOf[ActorSystemImpl].provider +class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader] = None) { def originalReceiver = input.getRecipient.getPath lazy val sender: ActorRef = - if (input.hasSender) provider.actorFor(provider.rootGuardian, input.getSender.getPath) - else remote.system.deadLetters + if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath) + else system.deadLetters - lazy val recipient: InternalActorRef = provider.actorFor(provider.rootGuardian, originalReceiver) + lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) - lazy val payload: AnyRef = MessageSerializer.deserialize(remote.system, input.getMessage, classLoader) + lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, classLoader) override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender } @@ -335,8 +368,9 @@ trait RemoteMarshallingOps { case m ⇒ l.!(m)(remoteMessage.sender) } case r: RemoteActorRef ⇒ + implicit val t = remote.transports remoteMessage.originalReceiver match { - case RemoteActorPath(address, _) if address == remote.remoteDaemon.path.address ⇒ + case ParsedActorPath(address, _) if address == remote.remoteDaemon.path.address ⇒ r.!(remoteMessage.payload)(remoteMessage.sender) case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 07c855fd6a..2d974e25fd 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -24,6 +24,7 @@ import akka.dispatch.Promise import java.net.InetAddress import akka.serialization.SerializationExtension import akka.serialization.Serialization +import akka.config.ConfigurationException /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -52,21 +53,16 @@ class RemoteActorRefProvider( val deployer = new RemoteDeployer(settings) - val rootPath: ActorPath = RootActorPath(RemoteAddress(systemName, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)) + val remote = new Remote(settings, remoteSettings) + implicit val transports = remote.transports + + val rootPath: ActorPath = RootActorPath(remote.remoteAddress) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) - @volatile - private var serialization: Serialization = _ - - @volatile - private var _remote: Remote = _ - def remote = _remote - def init(system: ActorSystemImpl) { local.init(system) - serialization = SerializationExtension(system) - _remote = new Remote(system, nodename, remoteSettings) + remote.init(system) local.registerExtraNames(Map(("remote", remote.remoteDaemon))) terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } @@ -119,12 +115,15 @@ class RemoteActorRefProvider( deployment match { case Some(DeploymentConfig.Deploy(_, _, _, _, RemoteDeploymentConfig.RemoteScope(address))) ⇒ - if (address == rootPath.address) local.actorOf(system, props, supervisor, path, false) - else { - val rpath = RootActorPath(address) / "remote" / rootPath.address.hostPort / path.elements - useActorOnNode(rpath, props.creator, supervisor) - new RemoteActorRef(this, remote.server, rpath, supervisor, None) + else address.parse(remote.transports) match { + case Left(x) ⇒ + // FIXME RK this should be done within the deployer, i.e. the whole parsing business + throw new ConfigurationException("cannot parse remote address: " + x) + case Right(addr) ⇒ + val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements + useActorOnNode(rpath, props.creator, supervisor) + new RemoteActorRef(this, remote.server, rpath, supervisor, None) } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService) @@ -132,13 +131,13 @@ class RemoteActorRefProvider( } def actorFor(path: ActorPath): InternalActorRef = path.root match { - case `rootPath` ⇒ actorFor(rootGuardian, path.elements) - case RootActorPath(_: RemoteAddress, _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None) - case _ ⇒ local.actorFor(path) + case `rootPath` ⇒ actorFor(rootGuardian, path.elements) + case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None) + case _ ⇒ local.actorFor(path) } def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { - case RemoteActorPath(address, elems) ⇒ + case ParsedActorPath(address, elems) ⇒ if (address == rootPath.address) actorFor(rootGuardian, elems) else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, Nobody, None) case _ ⇒ local.actorFor(ref, path) @@ -155,7 +154,7 @@ class RemoteActorRefProvider( log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path) val actorFactoryBytes = - serialization.serialize(actorFactory) match { + remote.serialization.serialize(actorFactory) match { case Left(error) ⇒ throw error case Right(bytes) ⇒ if (remoteSettings.ShouldCompressData) LZF.compress(bytes) else bytes } @@ -180,7 +179,7 @@ class RemoteActorRefProvider( */ private[akka] class RemoteActorRef private[akka] ( provider: ActorRefProvider, - remote: RemoteSupport, + remote: RemoteSupport[ParsedTransportAddress], val path: ActorPath, val getParent: InternalActorRef, loader: Option[ClassLoader]) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 49ae8d995c..5963bb23a2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -20,15 +20,15 @@ import java.util.concurrent.atomic.AtomicReference * @author Jonas Bonér */ class RemoteConnectionManager( - system: ActorSystem, + system: ActorSystemImpl, remote: Remote, - initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef]) + initialConnections: Map[ParsedTransportAddress, ActorRef] = Map.empty[ParsedTransportAddress, ActorRef]) extends ConnectionManager { val log = Logging(system, "RemoteConnectionManager") // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. - case class State(version: Long, connections: Map[RemoteAddress, ActorRef]) + case class State(version: Long, connections: Map[ParsedTransportAddress, ActorRef]) extends VersionedIterable[ActorRef] { def iterable: Iterable[ActorRef] = connections.values } @@ -54,7 +54,7 @@ class RemoteConnectionManager( def size: Int = connections.connections.size - def connectionFor(address: RemoteAddress): Option[ActorRef] = connections.connections.get(address) + def connectionFor(address: ParsedTransportAddress): Option[ActorRef] = connections.connections.get(address) def isEmpty: Boolean = connections.connections.isEmpty @@ -63,7 +63,7 @@ class RemoteConnectionManager( } @tailrec - final def failOver(from: RemoteAddress, to: RemoteAddress) { + final def failOver(from: ParsedTransportAddress, to: ParsedTransportAddress) { log.debug("Failing over connection from [{}] to [{}]", from, to) val oldState = state.get @@ -94,8 +94,8 @@ class RemoteConnectionManager( val oldState = state.get() var changed = false - var faultyAddress: RemoteAddress = null - var newConnections = Map.empty[RemoteAddress, ActorRef] + var faultyAddress: ParsedTransportAddress = null + var newConnections = Map.empty[ParsedTransportAddress, ActorRef] oldState.connections.keys foreach { address ⇒ val actorRef: ActorRef = oldState.connections.get(address).get @@ -121,7 +121,7 @@ class RemoteConnectionManager( } @tailrec - final def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + final def putIfAbsent(address: ParsedTransportAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { val oldState = state.get() val oldConnections = oldState.connections @@ -148,6 +148,6 @@ class RemoteConnectionManager( } } - private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) = - new RemoteActorRef(remote.system.provider, remote.server, actorPath, Nobody, None) + private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) = + new RemoteActorRef(system.provider, remote.server, actorPath, Nobody, None) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index 8e790991db..69e2e553ba 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -11,7 +11,7 @@ import akka.config.ConfigurationException object RemoteDeploymentConfig { - case class RemoteScope(node: RemoteAddress) extends DeploymentConfig.Scope + case class RemoteScope(node: UnparsedSystemAddress[UnparsedTransportAddress]) extends DeploymentConfig.Scope } @@ -28,7 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings val transform: Deploy ⇒ Deploy = if (deployment.hasPath("remote")) deployment.getString("remote") match { case RemoteAddressExtractor(r) ⇒ (d ⇒ d.copy(scope = RemoteScope(r))) - case _ ⇒ identity + case x ⇒ identity } else identity diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 7967813575..4f43c8f248 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -26,7 +26,9 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi // TODO cluster config will go into akka-cluster-reference.conf when we enable that module val ClusterName = getString("akka.cluster.name") - val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_, systemName)) + val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { + case RemoteAddressExtractor(addr) ⇒ addr.transport + } val NodeName: String = config.getString("akka.cluster.nodename") match { case "" ⇒ throw new ConfigurationException("akka.cluster.nodename configuration property must be defined") @@ -73,5 +75,8 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) val Backlog = config.getInt("akka.remote.server.backlog") + + // TODO handle the system name right and move this to config file syntax + val URI = "akka://sys@" + Hostname + ":" + Port } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala index 21da080f87..6dbc50eab8 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala @@ -13,49 +13,111 @@ import java.net.URI import java.net.URISyntaxException import java.net.InetAddress import java.net.UnknownHostException +import java.net.UnknownServiceException -object RemoteAddress { - def apply(system: String, host: String, port: Int): RemoteAddress = { - // TODO check whether we should not rather bail out early - val ip = try InetAddress.getByName(host) catch { case _: UnknownHostException ⇒ null } - new RemoteAddress(system, host, ip, port) - } +/** + * Interface for remote transports to encode their addresses. The three parts + * are named according to the URI spec (precisely java.net.URI) which is used + * for parsing. That means that the address’ parts must conform to what an + * URI expects, but otherwise each transport may assign a different meaning + * to these parts. + */ +trait RemoteTransportAddress { + def protocol: String + def host: String + def port: Int +} - val RE = """(?:(\w+)@)?(\w+):(\d+)""".r - object Int { - def unapply(s: String) = Some(Integer.parseInt(s)) - } - def apply(stringRep: String, defaultSystem: String): RemoteAddress = stringRep match { - case RE(sys, host, Int(port)) ⇒ apply(if (sys != null) sys else defaultSystem, host, port) - case _ ⇒ throw new IllegalArgumentException(stringRep + " is not a valid remote address [system@host:port]") - } +trait ParsedTransportAddress extends RemoteTransportAddress +case class RemoteNettyAddress(host: String, ip: Option[InetAddress], port: Int) extends ParsedTransportAddress { + def protocol = "akka" +} + +object RemoteNettyAddress { + def apply(host: String, port: Int): RemoteNettyAddress = { + // FIXME this may BLOCK for extended periods of time! + val ip = try Some(InetAddress.getByName(host)) catch { case _: UnknownHostException ⇒ None } + new RemoteNettyAddress(host, ip, port) + } + def apply(s: String): RemoteNettyAddress = { + val RE = """([^:]+):(\d+)""".r + s match { + case RE(h, p) ⇒ apply(h, Integer.parseInt(p)) + case _ ⇒ throw new IllegalArgumentException("cannot parse " + s + " as ") + } + } +} + +case class UnparsedTransportAddress(protocol: String, host: String, port: Int) extends RemoteTransportAddress { + def parse(transports: TransportsMap): RemoteTransportAddress = + transports.get(protocol) + .map(_(host, port)) + .toRight("protocol " + protocol + " not known") + .joinRight.fold(UnparseableTransportAddress(protocol, host, port, _), identity) +} + +case class UnparseableTransportAddress(protocol: String, host: String, port: Int, error: String) extends RemoteTransportAddress + +case class RemoteSystemAddress[+T <: ParsedTransportAddress](system: String, transport: T) extends Address { + def protocol = transport.protocol + @transient + lazy val hostPort = system + "@" + transport.host + ":" + transport.port +} + +case class UnparsedSystemAddress[+T <: RemoteTransportAddress](system: Option[String], transport: T) { + def parse(transports: TransportsMap): Either[UnparsedSystemAddress[UnparseableTransportAddress], RemoteSystemAddress[ParsedTransportAddress]] = + system match { + case Some(sys) ⇒ + transport match { + case x: ParsedTransportAddress ⇒ Right(RemoteSystemAddress(sys, x)) + case y: UnparsedTransportAddress ⇒ + y.parse(transports) match { + case x: ParsedTransportAddress ⇒ Right(RemoteSystemAddress(sys, x)) + case y: UnparseableTransportAddress ⇒ Left(UnparsedSystemAddress(system, y)) + case z ⇒ Left(UnparsedSystemAddress(system, UnparseableTransportAddress(z.protocol, z.host, z.port, "cannot parse " + z))) + } + case z ⇒ Left(UnparsedSystemAddress(system, UnparseableTransportAddress(z.protocol, z.host, z.port, "cannot parse " + z))) + } + case None ⇒ Left(UnparsedSystemAddress(None, UnparseableTransportAddress(transport.protocol, transport.host, transport.port, "no system name specified"))) + } } object RemoteAddressExtractor { - def unapply(s: String): Option[RemoteAddress] = { + def unapply(s: String): Option[UnparsedSystemAddress[UnparsedTransportAddress]] = { try { - val uri = new URI("akka://" + s) - if (uri.getScheme != "akka" || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1) None - else Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort)) + val uri = new URI(s) + if (uri.getScheme == null || uri.getHost == null || uri.getPort == -1) None + else Some(UnparsedSystemAddress(Option(uri.getUserInfo), UnparsedTransportAddress(uri.getScheme, uri.getHost, uri.getPort))) } catch { case _: URISyntaxException ⇒ None } } } -case class RemoteAddress(system: String, host: String, ip: InetAddress, port: Int) extends Address { - def protocol = "akka" - @transient - lazy val hostPort = system + "@" + host + ":" + port -} - object RemoteActorPath { - def unapply(addr: String): Option[(RemoteAddress, Iterable[String])] = { + def unapply(addr: String): Option[(UnparsedSystemAddress[UnparsedTransportAddress], Iterable[String])] = { try { val uri = new URI(addr) - if (uri.getScheme != "akka" || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1 || uri.getPath == null) None - else Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort), ActorPath.split(uri.getPath).drop(1)) + if (uri.getScheme == null || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1 || uri.getPath == null) None + else Some(UnparsedSystemAddress(Some(uri.getUserInfo), UnparsedTransportAddress(uri.getScheme, uri.getHost, uri.getPort)), + ActorPath.split(uri.getPath).drop(1)) + } catch { + case _: URISyntaxException ⇒ None + } + } +} + +object ParsedActorPath { + def unapply(addr: String)(implicit transports: TransportsMap): Option[(RemoteSystemAddress[ParsedTransportAddress], Iterable[String])] = { + try { + val uri = new URI(addr) + if (uri.getScheme == null || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1 || uri.getPath == null) None + else + UnparsedSystemAddress(Some(uri.getUserInfo), UnparsedTransportAddress(uri.getScheme, uri.getHost, uri.getPort)).parse(transports) match { + case Left(_) ⇒ None + case Right(x) ⇒ Some(x, ActorPath.split(uri.getPath).drop(1)) + } } catch { case _: URISyntaxException ⇒ None } @@ -77,70 +139,70 @@ sealed trait RemoteLifeCycleEvent * Life-cycle events for RemoteClient. */ trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { - def remoteAddress: RemoteAddress + def remoteAddress: ParsedTransportAddress } -case class RemoteClientError( +case class RemoteClientError[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, - @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remote: RemoteSupport[T], + @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected( - @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected( - @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent -case class RemoteClientStarted( - @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientStarted[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent -case class RemoteClientShutdown( - @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientShutdown[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent -case class RemoteClientWriteFailed( +case class RemoteClientWriteFailed[T <: ParsedTransportAddress]( @BeanProperty request: AnyRef, @BeanProperty cause: Throwable, - @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remote: RemoteSupport[T], + @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent /** * Life-cycle events for RemoteServer. */ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent -case class RemoteServerStarted( - @BeanProperty remote: RemoteSupport) extends RemoteServerLifeCycleEvent -case class RemoteServerShutdown( - @BeanProperty remote: RemoteSupport) extends RemoteServerLifeCycleEvent -case class RemoteServerError( +case class RemoteServerStarted[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent +case class RemoteServerShutdown[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent +case class RemoteServerError[T <: ParsedTransportAddress]( @BeanProperty val cause: Throwable, - @BeanProperty remote: RemoteSupport) extends RemoteServerLifeCycleEvent -case class RemoteServerClientConnected( - @BeanProperty remote: RemoteSupport, - @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent -case class RemoteServerClientDisconnected( - @BeanProperty remote: RemoteSupport, - @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent -case class RemoteServerClientClosed( - @BeanProperty remote: RemoteSupport, - @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent -case class RemoteServerWriteFailed( + @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientConnected[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientDisconnected[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientClosed[T <: ParsedTransportAddress]( + @BeanProperty remote: RemoteSupport[T], + @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent +case class RemoteServerWriteFailed[T <: ParsedTransportAddress]( @BeanProperty request: AnyRef, @BeanProperty cause: Throwable, - @BeanProperty server: RemoteSupport, - @BeanProperty remoteAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent + @BeanProperty server: RemoteSupport[T], + @BeanProperty remoteAddress: Option[T]) extends RemoteServerLifeCycleEvent /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. */ -class RemoteClientException private[akka] ( +class RemoteClientException[T <: ParsedTransportAddress] private[akka] ( message: String, - @BeanProperty val client: RemoteSupport, - val remoteAddress: RemoteAddress, cause: Throwable = null) extends AkkaException(message, cause) + @BeanProperty val client: RemoteSupport[T], + val remoteAddress: T, cause: Throwable = null) extends AkkaException(message, cause) /** * Thrown when the remote server actor dispatching fails for some reason. @@ -158,7 +220,7 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) } -abstract class RemoteSupport(val system: ActorSystem) { +abstract class RemoteSupport[-T <: ParsedTransportAddress](val system: ActorSystemImpl) { /** * Shuts down the remoting */ @@ -177,12 +239,12 @@ abstract class RemoteSupport(val system: ActorSystem) { /** * Shuts down a specific client connected to the supplied remote address returns true if successful */ - def shutdownClientConnection(address: RemoteAddress): Boolean + def shutdownClientConnection(address: T): Boolean /** * Restarts a specific client connected to the supplied remote address, but only if the client is not shut down */ - def restartClientConnection(address: RemoteAddress): Boolean + def restartClientConnection(address: T): Boolean /** Methods that needs to be implemented by a transport **/ diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a8ff2ae024..9e6fbd8df6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -38,7 +38,7 @@ class RemoteClientMessageBufferException(message: String, cause: Throwable = nul */ abstract class RemoteClient private[akka] ( val remoteSupport: NettyRemoteSupport, - val remoteAddress: RemoteAddress) { + val remoteAddress: RemoteNettyAddress) { val log = Logging(remoteSupport.system, "RemoteClient") @@ -54,7 +54,7 @@ abstract class RemoteClient private[akka] ( def shutdown(): Boolean - def isBoundTo(address: RemoteAddress): Boolean = remoteAddress == address + def isBoundTo(address: RemoteNettyAddress): Boolean = remoteAddress == address /** * Converts the message to the wireprotocol and sends the message across the wire @@ -71,7 +71,7 @@ abstract class RemoteClient private[akka] ( * Sends the message across the wire */ def send(request: RemoteMessageProtocol): Unit = { - log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport)) + log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport.system)) try { val payload = remoteSupport.createMessageSendEnvelope(request) @@ -95,7 +95,7 @@ abstract class RemoteClient private[akka] ( class PassiveRemoteClient(val currentChannel: Channel, remoteSupport: NettyRemoteSupport, - remoteAddress: RemoteAddress) + remoteAddress: RemoteNettyAddress) extends RemoteClient(remoteSupport, remoteAddress) { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { @@ -118,10 +118,12 @@ class PassiveRemoteClient(val currentChannel: Channel, */ class ActiveRemoteClient private[akka] ( remoteSupport: NettyRemoteSupport, - remoteAddress: RemoteAddress, + remoteAddress: RemoteNettyAddress, val loader: Option[ClassLoader] = None) extends RemoteClient(remoteSupport, remoteAddress) { + if (remoteAddress.ip.isEmpty) throw new java.net.UnknownHostException(remoteAddress.host) + import remoteSupport.clientSettings._ //TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @@ -151,8 +153,8 @@ class ActiveRemoteClient private[akka] ( if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder .setSystem(senderRemoteAddress.system) - .setHostname(senderRemoteAddress.host) - .setPort(senderRemoteAddress.port) + .setHostname(senderRemoteAddress.transport.host) + .setPort(senderRemoteAddress.transport.port) .build) connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build)) } @@ -165,7 +167,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) - val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip, remoteAddress.port)) + val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { @@ -187,7 +189,7 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) - connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip, remoteAddress.port)) + connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) @@ -248,7 +250,7 @@ class ActiveRemoteClient private[akka] ( class ActiveRemoteClientPipelineFactory( name: String, bootstrap: ClientBootstrap, - remoteAddress: RemoteAddress, + remoteAddress: RemoteNettyAddress, client: ActiveRemoteClient) extends ChannelPipelineFactory { import client.remoteSupport.clientSettings._ @@ -272,7 +274,7 @@ class ActiveRemoteClientPipelineFactory( class ActiveRemoteClientHandler( val name: String, val bootstrap: ClientBootstrap, - val remoteAddress: RemoteAddress, + val remoteAddress: RemoteNettyAddress, val timer: HashedWheelTimer, val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { @@ -292,7 +294,7 @@ class ActiveRemoteClientHandler( } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ - client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader)) + client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport.system, client.loader)) case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress) @@ -350,7 +352,8 @@ class ActiveRemoteClientHandler( /** * Provides the implementation of the Netty remote support */ -class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends RemoteSupport(_system) with RemoteMarshallingOps { +class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val address: RemoteSystemAddress[RemoteNettyAddress]) + extends RemoteSupport[RemoteNettyAddress](_system) with RemoteMarshallingOps { val log = Logging(system, "NettyRemoteSupport") val serverSettings = remote.remoteSettings.serverSettings @@ -360,7 +363,7 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot _system.registerOnTermination(timer.stop()) //Shut this guy down at the end - private val remoteClients = new HashMap[RemoteAddress, RemoteClient] + private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock override protected def useUntrustedMode = serverSettings.UntrustedMode @@ -371,7 +374,13 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot recipient: RemoteActorRef, loader: Option[ClassLoader]): Unit = { - val recipientAddress = recipient.path.address.asInstanceOf[RemoteAddress] + val recipientAddress = recipient.path.address match { + case RemoteSystemAddress(sys, transport) ⇒ + transport match { + case x: RemoteNettyAddress ⇒ x + case _ ⇒ throw new IllegalArgumentException("invoking NettyRemoteSupport.send with foreign target address " + transport) + } + } clientsLock.readLock.lock try { @@ -404,7 +413,7 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot } } - def bindClient(remoteAddress: RemoteAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = { + def bindClient(remoteAddress: RemoteNettyAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = { clientsLock.writeLock().lock() try { if (putIfAbsent && remoteClients.contains(remoteAddress)) false @@ -418,7 +427,7 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot } } - def unbindClient(remoteAddress: RemoteAddress): Unit = { + def unbindClient(remoteAddress: RemoteNettyAddress): Unit = { clientsLock.writeLock().lock() try { remoteClients.foreach { case (k, v) ⇒ if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) } } @@ -427,7 +436,7 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot } } - def shutdownClientConnection(remoteAddress: RemoteAddress): Boolean = { + def shutdownClientConnection(remoteAddress: RemoteNettyAddress): Boolean = { clientsLock.writeLock().lock() try { remoteClients.remove(remoteAddress) match { @@ -439,7 +448,7 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot } } - def restartClientConnection(remoteAddress: RemoteAddress): Boolean = { + def restartClientConnection(remoteAddress: RemoteNettyAddress): Boolean = { clientsLock.readLock().lock() try { remoteClients.get(remoteAddress) match { @@ -468,7 +477,7 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot def start(loader: Option[ClassLoader] = None): Unit = { _isRunning switchOn { try { - currentServer.set(Some(new NettyRemoteServer(this, loader))) + currentServer.set(Some(new NettyRemoteServer(this, loader, address))) } catch { case e: Exception ⇒ notifyListeners(RemoteServerError(e, this)) } @@ -491,11 +500,14 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot } } -class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) { +class NettyRemoteServer( + val remoteSupport: NettyRemoteSupport, + val loader: Option[ClassLoader], + val address: RemoteSystemAddress[RemoteNettyAddress]) { val log = Logging(remoteSupport.system, "NettyRemoteServer") import remoteSupport.serverSettings._ - val address = remoteSupport.remote.remoteAddress + if (address.transport.ip.isEmpty) throw new java.net.UnknownHostException(address.transport.host) val name = "NettyRemoteServer@" + address @@ -514,7 +526,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) - openChannels.add(bootstrap.bind(new InetSocketAddress(address.ip, address.port))) + openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port))) remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) def shutdown() { @@ -523,8 +535,8 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder .setSystem(address.system) - .setHostname(address.host) - .setPort(address.port) + .setHostname(address.transport.host) + .setPort(address.transport.port) .build) if (SecureCookie.nonEmpty) b.setCookie(SecureCookie.get) @@ -639,20 +651,20 @@ class RemoteServerHandler( remoteSupport.unbindClient(address) remoteSupport.notifyListeners(RemoteServerClientClosed(remoteSupport, s)) case None ⇒ - remoteSupport.notifyListeners(RemoteServerClientClosed(remoteSupport, None)) + remoteSupport.notifyListeners(RemoteServerClientClosed[RemoteNettyAddress](remoteSupport, None)) } override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { event.getMessage match { case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ - remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader)) + remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport.system, applicationLoader)) case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ val instruction = remote.getInstruction instruction.getCommandType match { case CommandType.CONNECT if UsePassiveConnections ⇒ val origin = instruction.getOrigin - val inbound = RemoteAddress(origin.getSystem, origin.getHostname, origin.getPort) + val inbound = RemoteNettyAddress(origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) remoteSupport.bindClient(inbound, client) case CommandType.SHUTDOWN ⇒ //FIXME Dispose passive connection here, ticket #1410 @@ -669,9 +681,9 @@ class RemoteServerHandler( event.getChannel.close() } - private def getClientAddress(c: Channel): Option[RemoteAddress] = + private def getClientAddress(c: Channel): Option[RemoteNettyAddress] = c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(RemoteAddress("BORKED", inet.getHostName, inet.getPort)) // FIXME RK Broken! + case inet: InetSocketAddress ⇒ Some(RemoteNettyAddress(inet.getHostName, inet.getPort)) case _ ⇒ None } } diff --git a/akka-remote/src/main/scala/akka/remote/package.scala b/akka-remote/src/main/scala/akka/remote/package.scala new file mode 100644 index 0000000000..e2514d6592 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/package.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2009-2010 Typesafe Inc. + */ +package akka + +package object remote { + type TransportsMap = Map[String, (String, Int) ⇒ Either[String, RemoteTransportAddress]] +} \ No newline at end of file diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf index 44ecc0f408..ebda77c369 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf @@ -3,7 +3,7 @@ akka { actor { provider = "akka.remote.RemoteActorRefProvider" deployment { - /service-hello.remote = "AkkaRemoteSpec@localhost:9991" + /service-hello.remote = "akka://AkkaRemoteSpec@localhost:9991" } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf index 35fe98a817..88e1ac4ca5 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf @@ -3,7 +3,7 @@ akka { actor { provider = "akka.remote.RemoteActorRefProvider" deployment { - /service-hello.remote = "AkkaRemoteSpec@localhost:9991" + /service-hello.remote = "akka://AkkaRemoteSpec@localhost:9991" } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf index 44ecc0f408..ebda77c369 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf @@ -3,7 +3,7 @@ akka { actor { provider = "akka.remote.RemoteActorRefProvider" deployment { - /service-hello.remote = "AkkaRemoteSpec@localhost:9991" + /service-hello.remote = "akka://AkkaRemoteSpec@localhost:9991" } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf index 44ecc0f408..ebda77c369 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf @@ -3,7 +3,7 @@ akka { actor { provider = "akka.remote.RemoteActorRefProvider" deployment { - /service-hello.remote = "AkkaRemoteSpec@localhost:9991" + /service-hello.remote = "akka://AkkaRemoteSpec@localhost:9991" } } } diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 94e2c0272c..5ac2f281ab 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -6,10 +6,10 @@ import akka.testkit.AkkaSpec class AccrualFailureDetectorSpec extends AkkaSpec { "An AccrualFailureDetector" must { - val conn = RemoteAddress("tester", "localhost", 2552) + val conn = RemoteNettyAddress("localhost", 2552) "mark node as available after a series of successful heartbeats" in { - val fd = new AccrualFailureDetector + val fd = new AccrualFailureDetector() fd.heartbeat(conn) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index cb4b3c2f52..2d406e74c4 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -38,9 +38,9 @@ akka { port = 12345 } actor.deployment { - /blub.remote = "remote_sys@localhost:12346" - /looker/child.remote = "remote_sys@localhost:12346" - /looker/child/grandchild.remote = "RemoteCommunicationSpec@localhost:12345" + /blub.remote = "akka://remote_sys@localhost:12346" + /looker/child.remote = "akka://remote_sys@localhost:12346" + /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" } } """) with ImplicitSender { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index c8daf3b13b..e94e0b8fd3 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -17,7 +17,7 @@ object RemoteDeployerSpec { /user/service2 { router = round-robin nr-of-instances = 3 - remote = "sys@wallace:2552" + remote = "akka://sys@wallace:2552" } } """, ConfigParseOptions.defaults) @@ -44,7 +44,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { None, RoundRobin, NrOfInstances(3), - RemoteScope(RemoteAddress("sys", "wallace", 2552))))) + RemoteScope(UnparsedSystemAddress(Some("sys"), UnparsedTransportAddress("akka", "wallace", 2552)))))) } }