From ba9281e26725ea6f29429048b28c9adf0587d814 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 10 Nov 2011 17:39:04 +0100 Subject: [PATCH] Removing InetSocketAddress as much as possible from the remoting, switching to RemoteAddress for an easier way forward with different transports. Also removing quite a few allocations internally in the remoting as a side-efect of this. --- .../src/main/scala/akka/AkkaApplication.scala | 8 +-- .../src/main/scala/akka/actor/ActorRef.scala | 4 +- .../scala/akka/remote/RemoteInterface.scala | 52 +++++++++----- .../akka/routing/ConnectionManager.scala | 9 +-- .../main/scala/akka/util/RemoteAddress.scala | 29 -------- .../akka/remote/AccrualFailureDetector.scala | 15 ++-- .../src/main/scala/akka/remote/Gossiper.scala | 28 ++++---- .../akka/remote/NetworkEventStream.scala | 14 ++-- .../akka/remote/RemoteActorRefProvider.scala | 18 ++--- .../akka/remote/RemoteConnectionManager.scala | 17 +++-- .../remote/netty/NettyRemoteSupport.scala | 69 ++++++++----------- .../remote/AccrualFailureDetectorSpec.scala | 5 +- 12 files changed, 122 insertions(+), 146 deletions(-) delete mode 100644 akka-actor/src/main/scala/akka/util/RemoteAddress.scala diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 7ec88972fe..71261ceb35 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -14,9 +14,9 @@ import akka.dispatch.{ Dispatchers, Future } import akka.util.Duration import akka.util.ReflectiveAccess import akka.routing.Routing -import akka.remote.RemoteSupport import akka.serialization.Serialization import java.net.InetSocketAddress +import remote.{ RemoteAddress, RemoteSupport } object AkkaApplication { @@ -155,7 +155,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor case value ⇒ value } - val defaultAddress = new InetSocketAddress(System.getProperty("akka.remote.hostname") match { + val defaultAddress = RemoteAddress(System.getProperty("akka.remote.hostname") match { case null | "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value }, System.getProperty("akka.remote.port") match { @@ -163,9 +163,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor case value ⇒ value.toInt }) - def hostname: String = defaultAddress.getAddress.getHostAddress + def hostname: String = defaultAddress.hostname - def port: Int = defaultAddress.getPort + def port: Int = defaultAddress.port // this provides basic logging (to stdout) until .start() is called below val mainbus = new MainBus(DebugMainBus) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6a3392f910..eb62873a30 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,6 +13,7 @@ import akka.event.ActorEventBus import akka.serialization.Serialization import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef import java.net.InetSocketAddress +import akka.remote.RemoteAddress /** * ActorRef is an immutable and serializable handle to an Actor. @@ -286,7 +287,8 @@ trait ScalaActorRef { ref: ActorRef ⇒ case class SerializedActorRef(address: String, hostname: String, port: Int) { import akka.serialization.Serialization.app - def this(address: String, inet: InetSocketAddress) = this(address, inet.getAddress.getHostAddress, inet.getPort) + def this(address: String, remoteAddress: RemoteAddress) = this(address, remoteAddress.hostname, remoteAddress.port) + def this(address: String, remoteAddress: InetSocketAddress) = this(address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) //TODO FIXME REMOVE @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = { diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index b5bbf3baa9..b23feb5b54 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -8,9 +8,29 @@ import akka.actor._ import akka.{ AkkaException, AkkaApplication } import scala.reflect.BeanProperty +import java.io.{ PrintWriter, PrintStream } import java.net.InetSocketAddress -import java.io.{ PrintWriter, PrintStream } + +object RemoteAddress { + def apply(host: String, port: Int): RemoteAddress = apply(new InetSocketAddress(host, port)) + def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match { + case null ⇒ null + case inet ⇒ + val host = inet.getAddress.getHostAddress + val portNo = inet.getPort + new RemoteAddress { + def hostname = host + def port = portNo + } + } +} + +trait RemoteAddress { + def hostname: String + def port: Int + override def toString = "" + hostname + ":" + port +} class RemoteException(message: String) extends AkkaException(message) @@ -27,35 +47,35 @@ sealed trait RemoteLifeCycleEvent * Life-cycle events for RemoteClient. */ trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { - def remoteAddress: InetSocketAddress + def remoteAddress: RemoteAddress } case class RemoteClientError( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent case class RemoteClientDisconnected( @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent case class RemoteClientConnected( @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent case class RemoteClientStarted( @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent case class RemoteClientShutdown( @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent case class RemoteClientWriteFailed( @BeanProperty request: AnyRef, @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport, - @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent /** * Life-cycle events for RemoteServer. @@ -71,18 +91,18 @@ case class RemoteServerError( @BeanProperty remote: RemoteSupport) extends RemoteServerLifeCycleEvent case class RemoteServerClientConnected( @BeanProperty remote: RemoteSupport, - @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerClientDisconnected( @BeanProperty remote: RemoteSupport, - @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerClientClosed( @BeanProperty remote: RemoteSupport, - @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerWriteFailed( @BeanProperty request: AnyRef, @BeanProperty cause: Throwable, @BeanProperty server: RemoteSupport, - @BeanProperty remoteAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + @BeanProperty remoteAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. @@ -90,7 +110,7 @@ case class RemoteServerWriteFailed( class RemoteClientException private[akka] ( message: String, @BeanProperty val client: RemoteSupport, - val remoteAddress: InetSocketAddress, cause: Throwable = null) extends AkkaException(message, cause) + val remoteAddress: RemoteAddress, cause: Throwable = null) extends AkkaException(message, cause) /** * Thrown when the remote server actor dispatching fails for some reason. @@ -127,18 +147,18 @@ abstract class RemoteSupport(val app: AkkaApplication) { /** * Shuts down a specific client connected to the supplied remote address returns true if successful */ - def shutdownClientConnection(address: InetSocketAddress): Boolean + def shutdownClientConnection(address: RemoteAddress): Boolean /** * Restarts a specific client connected to the supplied remote address, but only if the client is not shut down */ - def restartClientConnection(address: InetSocketAddress): Boolean + def restartClientConnection(address: RemoteAddress): Boolean /** Methods that needs to be implemented by a transport **/ protected[akka] def send(message: Any, senderOption: Option[ActorRef], - remoteAddress: InetSocketAddress, + remoteAddress: RemoteAddress, recipient: ActorRef, loader: Option[ClassLoader]): Unit diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index 80230e73ff..b7655e376e 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -10,6 +10,7 @@ import scala.annotation.tailrec import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } import java.net.InetSocketAddress +import akka.remote.RemoteAddress /** * An Iterable that also contains a version. @@ -71,12 +72,12 @@ trait ConnectionManager { /** * Creates a new connection (ActorRef) if it didn't exist. Atomically. */ - def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef + def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef /** * Fails over connections from one address to another. */ - def failOver(from: InetSocketAddress, to: InetSocketAddress) + def failOver(from: RemoteAddress, to: RemoteAddress) } /** @@ -120,9 +121,9 @@ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends Con } } - def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here + def failOver(from: RemoteAddress, to: RemoteAddress) {} // do nothing here - def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { throw new UnsupportedOperationException("Not supported") } } diff --git a/akka-actor/src/main/scala/akka/util/RemoteAddress.scala b/akka-actor/src/main/scala/akka/util/RemoteAddress.scala deleted file mode 100644 index e4eeca7324..0000000000 --- a/akka-actor/src/main/scala/akka/util/RemoteAddress.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ -package akka.util - -import java.net.InetSocketAddress - -object RemoteAddress { - def apply(hostname: String, port: Int) = new RemoteAddress(hostname, port) - def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match { - case null ⇒ null - case inet ⇒ new RemoteAddress(inet.getAddress.getHostAddress, inet.getPort) - } -} - -class RemoteAddress(val hostname: String, val port: Int) { - override val hashCode: Int = { - var result = HashCode.SEED - result = HashCode.hash(result, hostname) - result = HashCode.hash(result, port) - result - } - - override def equals(that: Any): Boolean = { - that.isInstanceOf[RemoteAddress] && - that.asInstanceOf[RemoteAddress].hostname == hostname && - that.asInstanceOf[RemoteAddress].port == port - } -} diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index 1632eb9055..9f98bcdb56 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -4,7 +4,6 @@ package akka.remote -import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.Map @@ -42,9 +41,9 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 */ private case class State( version: Long = 0L, - failureStats: Map[InetSocketAddress, FailureStats] = Map.empty[InetSocketAddress, FailureStats], - intervalHistory: Map[InetSocketAddress, Vector[Long]] = Map.empty[InetSocketAddress, Vector[Long]], - timestamps: Map[InetSocketAddress, Long] = Map.empty[InetSocketAddress, Long]) + failureStats: Map[RemoteAddress, FailureStats] = Map.empty[RemoteAddress, FailureStats], + intervalHistory: Map[RemoteAddress, Vector[Long]] = Map.empty[RemoteAddress, Vector[Long]], + timestamps: Map[RemoteAddress, Long] = Map.empty[RemoteAddress, Long]) private val state = new AtomicReference[State](State()) @@ -52,13 +51,13 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Returns true if the connection is considered to be up and healthy * and returns false otherwise. */ - def isAvailable(connection: InetSocketAddress): Boolean = phi(connection) < threshold + def isAvailable(connection: RemoteAddress): Boolean = phi(connection) < threshold /** * Records a heartbeat for a connection. */ @tailrec - final def heartbeat(connection: InetSocketAddress) { + final def heartbeat(connection: RemoteAddress) { val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -139,7 +138,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Implementations of 'Cumulative Distribution Function' for Exponential Distribution. * For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597]. */ - def phi(connection: InetSocketAddress): Double = { + def phi(connection: RemoteAddress): Double = { val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections @@ -154,7 +153,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Removes the heartbeat management for a connection. */ @tailrec - final def remove(connection: InetSocketAddress) { + final def remove(connection: RemoteAddress) { val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 4a5efc4f40..65a928abee 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -12,9 +12,7 @@ import akka.util.duration._ import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ -import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.TimeUnit import java.security.SecureRandom import System.{ currentTimeMillis ⇒ newTimestamp } @@ -27,8 +25,8 @@ import com.google.protobuf.ByteString * Interface for node membership change listener. */ trait NodeMembershipChangeListener { - def nodeConnected(node: InetSocketAddress) - def nodeDisconnected(node: InetSocketAddress) + def nodeConnected(node: RemoteAddress) + def nodeDisconnected(node: RemoteAddress) } /** @@ -36,23 +34,23 @@ trait NodeMembershipChangeListener { */ case class Gossip( version: VectorClock, - node: InetSocketAddress, - availableNodes: Set[InetSocketAddress] = Set.empty[InetSocketAddress], - unavailableNodes: Set[InetSocketAddress] = Set.empty[InetSocketAddress]) + node: RemoteAddress, + availableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress], + unavailableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress]) /* // ====== NEW GOSSIP IMPLEMENTATION ====== case class Gossip( version: VectorClock, - node: InetSocketAddress, - leader: InetSocketAddress, // FIXME leader is always head of 'members', so we probably don't need this field + node: RemoteAddress, + leader: RemoteAddress, // FIXME leader is always head of 'members', so we probably don't need this field members: SortedSet[Member] = SortetSet.empty[Member](Ordering.fromLessThan[String](_ > _)), // sorted set of members with their status, sorted by name seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock], // for ring convergence pendingChanges: Option[Vector[PendingPartitioningChange]] = None, // for handoff meta: Option[Map[String, Array[Byte]]] = None) // misc meta-data - case class Member(address: InetSocketAddress, status: MemberStatus) + case class Member(address: RemoteAddress, status: MemberStatus) sealed trait MemberStatus object MemberStatus { @@ -73,8 +71,8 @@ case class Gossip( type VNodeMod = AnyRef case class PendingPartitioningChange( - owner: InetSocketAddress, - nextOwner: InetSocketAddress, + owner: RemoteAddress, + nextOwner: RemoteAddress, changes: Vector[VNodeMod], status: PendingPartitioningStatus) */ @@ -107,7 +105,7 @@ class Gossiper(remote: Remote) { private val app = remote.app private val log = Logging(app, this) private val failureDetector = remote.failureDetector - private val connectionManager = new RemoteConnectionManager(app, remote, Map.empty[InetSocketAddress, ActorRef]) + private val connectionManager = new RemoteConnectionManager(app, remote, Map.empty[RemoteAddress, ActorRef]) private val seeds = Set(address) // FIXME read in list of seeds from config private val scheduler = new DefaultScheduler @@ -231,7 +229,7 @@ class Gossiper(remote: Remote) { /** * Gossips set of nodes passed in as argument. Returns 'true' if it gossiped to a "seed" node. */ - private def gossipTo(nodes: Set[InetSocketAddress]): Boolean = { + private def gossipTo(nodes: Set[RemoteAddress]): Boolean = { val peers = nodes filter (_ != address) // filter out myself val peer = selectRandomNode(peers) val oldState = state.get @@ -323,7 +321,7 @@ class Gossiper(remote: Remote) { } } - private def selectRandomNode(nodes: Set[InetSocketAddress]): InetSocketAddress = { + private def selectRandomNode(nodes: Set[RemoteAddress]): RemoteAddress = { nodes.toList(random.nextInt(nodes.size)) } } diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 872b2c23f3..cf88b70c92 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -4,9 +4,7 @@ package akka.remote -import akka.dispatch.PinnedDispatcher import scala.collection.mutable -import java.net.InetSocketAddress import akka.actor.{ LocalActorRef, Actor, ActorRef, Props, newUuid } import akka.actor.Actor._ import akka.AkkaApplication @@ -21,10 +19,10 @@ object NetworkEventStream { private sealed trait NetworkEventStreamEvent - private case class Register(listener: Listener, connectionAddress: InetSocketAddress) + private case class Register(listener: Listener, connectionAddress: RemoteAddress) extends NetworkEventStreamEvent - private case class Unregister(listener: Listener, connectionAddress: InetSocketAddress) + private case class Unregister(listener: Listener, connectionAddress: RemoteAddress) extends NetworkEventStreamEvent /** @@ -39,8 +37,8 @@ object NetworkEventStream { */ private class Channel extends Actor { - val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[Listener]]() { - override def default(k: InetSocketAddress) = mutable.Set.empty[Listener] + val listeners = new mutable.HashMap[RemoteAddress, mutable.Set[Listener]]() { + override def default(k: RemoteAddress) = mutable.Set.empty[Listener] } def receive = { @@ -72,12 +70,12 @@ class NetworkEventStream(val app: AkkaApplication) { /** * Registers a network event stream listener (asyncronously). */ - def register(listener: Listener, connectionAddress: InetSocketAddress) = + def register(listener: Listener, connectionAddress: RemoteAddress) = sender ! Register(listener, connectionAddress) /** * Unregisters a network event stream listener (asyncronously) . */ - def unregister(listener: Listener, connectionAddress: InetSocketAddress) = + def unregister(listener: Listener, connectionAddress: RemoteAddress) = sender ! Unregister(listener, connectionAddress) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index eca7372f3f..dc158e3d5e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -111,9 +111,9 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) } - val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ - val inetAddr = new InetSocketAddress(a.hostname, a.port) - conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) + val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ + val remoteAddress = RemoteAddress(a.hostname, a.port) + conns + (remoteAddress -> RemoteActorRef(remote.server, remoteAddress, address, None)) } val connectionManager = new RemoteConnectionManager(app, remote, connections) @@ -169,19 +169,19 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { - if (optimizeLocalScoped_? && (actor.hostname == app.hostname || actor.hostname == app.defaultAddress.getHostName) && actor.port == app.port) { + val remoteAddress = RemoteAddress(actor.hostname, actor.port) + if (optimizeLocalScoped_? && remoteAddress == app.defaultAddress) { local.actorFor(actor.address) } else { - val remoteInetSocketAddress = new InetSocketAddress(actor.hostname, actor.port) //FIXME Drop the InetSocketAddresses and use RemoteAddress - log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.address, remoteInetSocketAddress) - Some(RemoteActorRef(remote.server, remoteInetSocketAddress, actor.address, None)) //Should it be None here + log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.address, remoteAddress) + Some(RemoteActorRef(remote.server, remoteAddress, actor.address, None)) //Should it be None here } } /** * Using (checking out) actor on a specific node. */ - def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () ⇒ Actor) { + def useActorOnNode(remoteAddress: RemoteAddress, actorAddress: String, actorFactory: () ⇒ Actor) { log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorAddress, remoteAddress) val actorFactoryBytes = @@ -244,7 +244,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider */ private[akka] case class RemoteActorRef private[akka] ( remote: RemoteSupport, - remoteAddress: InetSocketAddress, + remoteAddress: RemoteAddress, address: String, loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index f76f9d072a..29b425a87e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -12,7 +12,6 @@ import akka.event.Logging import scala.collection.immutable.Map import scala.annotation.tailrec -import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference /** @@ -23,13 +22,13 @@ import java.util.concurrent.atomic.AtomicReference class RemoteConnectionManager( app: AkkaApplication, remote: Remote, - initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) + initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef]) extends ConnectionManager { val log = Logging(app, this) // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. - case class State(version: Long, connections: Map[InetSocketAddress, ActorRef]) + case class State(version: Long, connections: Map[RemoteAddress, ActorRef]) extends VersionedIterable[ActorRef] { def iterable: Iterable[ActorRef] = connections.values } @@ -55,7 +54,7 @@ class RemoteConnectionManager( def size: Int = connections.connections.size - def connectionFor(address: InetSocketAddress): Option[ActorRef] = connections.connections.get(address) + def connectionFor(address: RemoteAddress): Option[ActorRef] = connections.connections.get(address) def isEmpty: Boolean = connections.connections.isEmpty @@ -64,7 +63,7 @@ class RemoteConnectionManager( } @tailrec - final def failOver(from: InetSocketAddress, to: InetSocketAddress) { + final def failOver(from: RemoteAddress, to: RemoteAddress) { log.debug("Failing over connection from [{}] to [{}]", from, to) val oldState = state.get @@ -95,8 +94,8 @@ class RemoteConnectionManager( val oldState = state.get() var changed = false - var faultyAddress: InetSocketAddress = null - var newConnections = Map.empty[InetSocketAddress, ActorRef] + var faultyAddress: RemoteAddress = null + var newConnections = Map.empty[RemoteAddress, ActorRef] oldState.connections.keys foreach { address ⇒ val actorRef: ActorRef = oldState.connections.get(address).get @@ -122,7 +121,7 @@ class RemoteConnectionManager( } @tailrec - final def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + final def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { val oldState = state.get() val oldConnections = oldState.connections @@ -149,7 +148,7 @@ class RemoteConnectionManager( } } - private[remote] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { + private[remote] def newConnection(actorAddress: String, inetSocketAddress: RemoteAddress) = { RemoteActorRef(remote.server, inetSocketAddress, actorAddress, None) } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 7a6f27acf9..d0c48ac442 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -36,13 +36,11 @@ class RemoteClientMessageBufferException(message: String, cause: Throwable = nul */ abstract class RemoteClient private[akka] ( val remoteSupport: NettyRemoteSupport, - val remoteAddress: InetSocketAddress) { + val remoteAddress: RemoteAddress) { val log = Logging(remoteSupport.app, this) - val name = simpleName(this) + "@" + - remoteAddress.getAddress.getHostAddress + "::" + - remoteAddress.getPort + val name = simpleName(this) + "@" + remoteAddress private[remote] val runSwitch = new Switch() @@ -54,18 +52,13 @@ abstract class RemoteClient private[akka] ( def shutdown(): Boolean - def isBoundTo(address: InetSocketAddress): Boolean = currentChannel.getRemoteAddress match { - case remoteAddress: InetSocketAddress ⇒ - address.getAddress.getHostAddress == remoteAddress.getAddress.getHostAddress && address.getPort == remoteAddress.getPort - case _ ⇒ false - } + def isBoundTo(address: RemoteAddress): Boolean = remoteAddress == address /** * Converts the message to the wireprotocol and sends the message across the wire */ - def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef) { + def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build) - } /** * Sends the message across the wire @@ -101,8 +94,7 @@ abstract class RemoteClient private[akka] ( class PassiveRemoteClient(val currentChannel: Channel, remoteSupport: NettyRemoteSupport, - remoteAddress: InetSocketAddress, - val loader: Option[ClassLoader] = None) + remoteAddress: RemoteAddress) extends RemoteClient(remoteSupport, remoteAddress) { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { @@ -125,7 +117,7 @@ class PassiveRemoteClient(val currentChannel: Channel, */ class ActiveRemoteClient private[akka] ( remoteSupport: NettyRemoteSupport, - remoteAddress: InetSocketAddress, + remoteAddress: RemoteAddress, val loader: Option[ClassLoader] = None) extends RemoteClient(remoteSupport, remoteAddress) { @@ -167,7 +159,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) - val connection = bootstrap.connect(remoteAddress) + val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port)) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { @@ -190,7 +182,7 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) - connection = bootstrap.connect(remoteAddress) + connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port)) val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) @@ -253,7 +245,7 @@ class ActiveRemoteClient private[akka] ( class ActiveRemoteClientPipelineFactory( name: String, bootstrap: ClientBootstrap, - remoteAddress: InetSocketAddress, + remoteAddress: RemoteAddress, timer: HashedWheelTimer, client: ActiveRemoteClient) extends ChannelPipelineFactory { @@ -278,7 +270,7 @@ class ActiveRemoteClientPipelineFactory( class ActiveRemoteClientHandler( val name: String, val bootstrap: ClientBootstrap, - val remoteAddress: InetSocketAddress, + val remoteAddress: RemoteAddress, val timer: HashedWheelTimer, val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { @@ -366,26 +358,25 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with protected[akka] def send(message: Any, senderOption: Option[ActorRef], - recipientAddress: InetSocketAddress, + recipientAddress: RemoteAddress, recipient: ActorRef, loader: Option[ClassLoader]): Unit = { - val key = RemoteAddress(recipientAddress) clientsLock.readLock.lock try { - val client = remoteClients.get(key) match { + val client = remoteClients.get(recipientAddress) match { case Some(client) ⇒ client case None ⇒ clientsLock.readLock.unlock clientsLock.writeLock.lock //Lock upgrade, not supported natively try { try { - remoteClients.get(key) match { + remoteClients.get(recipientAddress) match { //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map val client = new ActiveRemoteClient(this, recipientAddress, loader) client.connect() - remoteClients += key -> client + remoteClients += recipientAddress -> client client } } finally { @@ -401,31 +392,30 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with } } - def bindClient(inetAddress: InetSocketAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = clientsLock withWriteGuard { - val address = RemoteAddress(inetAddress) - if (putIfAbsent && remoteClients.contains(address)) false + def bindClient(remoteAddress: RemoteAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = clientsLock withWriteGuard { + if (putIfAbsent && remoteClients.contains(remoteAddress)) false else { client.connect() - remoteClients.put(address, client).foreach(_.shutdown()) + remoteClients.put(remoteAddress, client).foreach(_.shutdown()) true } } - def unbindClient(inetAddress: InetSocketAddress): Unit = clientsLock withWriteGuard { + def unbindClient(remoteAddress: RemoteAddress): Unit = clientsLock withWriteGuard { remoteClients.foreach { - case (k, v) ⇒ if (v.isBoundTo(inetAddress)) { v.shutdown(); remoteClients.remove(k) } + case (k, v) ⇒ if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) } } } - def shutdownClientConnection(address: InetSocketAddress): Boolean = clientsLock withWriteGuard { - remoteClients.remove(RemoteAddress(address)) match { + def shutdownClientConnection(remoteAddress: RemoteAddress): Boolean = clientsLock withWriteGuard { + remoteClients.remove(remoteAddress) match { case Some(client) ⇒ client.shutdown() case None ⇒ false } } - def restartClientConnection(address: InetSocketAddress): Boolean = clientsLock withReadGuard { - remoteClients.get(RemoteAddress(address)) match { + def restartClientConnection(remoteAddress: RemoteAddress): Boolean = clientsLock withReadGuard { + remoteClients.get(remoteAddress) match { case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) case None ⇒ false } @@ -469,8 +459,9 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) { val log = Logging(remoteSupport.app, this) import remoteSupport.serverSettings._ + import remoteSupport.app.defaultAddress - val name = "NettyRemoteServer@" + remoteSupport.app.hostname + ":" + remoteSupport.app.port + val name = "NettyRemoteServer@" + defaultAddress private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) @@ -487,7 +478,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT.toMillis) - openChannels.add(bootstrap.bind(remoteSupport.app.defaultAddress)) + openChannels.add(bootstrap.bind(new InetSocketAddress(defaultAddress.hostname, defaultAddress.port))) remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) def shutdown() { @@ -620,8 +611,8 @@ class RemoteServerHandler( instruction.getCommandType match { case CommandType.CONNECT if USE_PASSIVE_CONNECTIONS ⇒ val origin = instruction.getOrigin - val inbound = new InetSocketAddress(origin.getHostname, origin.getPort) - val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound, applicationLoader) + val inbound = RemoteAddress(origin.getHostname, origin.getPort) + val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) remoteSupport.bindClient(inbound, client) case CommandType.SHUTDOWN ⇒ //TODO FIXME Dispose passive connection here case _ ⇒ //Unknown command @@ -637,9 +628,9 @@ class RemoteServerHandler( event.getChannel.close } - private def getClientAddress(c: Channel): Option[InetSocketAddress] = + private def getClientAddress(c: Channel): Option[RemoteAddress] = c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(inet) + case inet: InetSocketAddress ⇒ Some(RemoteAddress(inet)) case _ ⇒ None } } diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 6a28415636..5b94895756 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -8,10 +8,10 @@ import java.net.InetSocketAddress class AccrualFailureDetectorSpec extends WordSpec with MustMatchers { "An AccrualFailureDetector" should { + val conn = RemoteAddress(new InetSocketAddress("localhost", 2552)) "mark node as available after a series of successful heartbeats" in { val fd = new AccrualFailureDetector - val conn = new InetSocketAddress("localhost", 2552) fd.heartbeat(conn) @@ -27,7 +27,6 @@ class AccrualFailureDetectorSpec extends WordSpec with MustMatchers { // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector "mark node as dead after explicit removal of connection" ignore { val fd = new AccrualFailureDetector - val conn = new InetSocketAddress("localhost", 2552) fd.heartbeat(conn) @@ -46,7 +45,6 @@ class AccrualFailureDetectorSpec extends WordSpec with MustMatchers { "mark node as dead if heartbeat are missed" in { val fd = new AccrualFailureDetector(threshold = 3) - val conn = new InetSocketAddress("localhost", 2552) fd.heartbeat(conn) @@ -65,7 +63,6 @@ class AccrualFailureDetectorSpec extends WordSpec with MustMatchers { "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { val fd = new AccrualFailureDetector(threshold = 3) - val conn = new InetSocketAddress("localhost", 2552) fd.heartbeat(conn)