Removing InetSocketAddress as much as possible from the remoting, switching to RemoteAddress for an easier way forward with different transports. Also removing quite a few allocations internally in the remoting as a side-efect of this.

This commit is contained in:
Viktor Klang 2011-11-10 17:39:04 +01:00
parent 0800511ac9
commit ba9281e267
12 changed files with 122 additions and 146 deletions

View file

@ -14,9 +14,9 @@ import akka.dispatch.{ Dispatchers, Future }
import akka.util.Duration import akka.util.Duration
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import akka.routing.Routing import akka.routing.Routing
import akka.remote.RemoteSupport
import akka.serialization.Serialization import akka.serialization.Serialization
import java.net.InetSocketAddress import java.net.InetSocketAddress
import remote.{ RemoteAddress, RemoteSupport }
object AkkaApplication { object AkkaApplication {
@ -155,7 +155,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
case value value case value value
} }
val defaultAddress = new InetSocketAddress(System.getProperty("akka.remote.hostname") match { val defaultAddress = RemoteAddress(System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostAddress case null | "" InetAddress.getLocalHost.getHostAddress
case value value case value value
}, System.getProperty("akka.remote.port") match { }, System.getProperty("akka.remote.port") match {
@ -163,9 +163,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
case value value.toInt case value value.toInt
}) })
def hostname: String = defaultAddress.getAddress.getHostAddress def hostname: String = defaultAddress.hostname
def port: Int = defaultAddress.getPort def port: Int = defaultAddress.port
// this provides basic logging (to stdout) until .start() is called below // this provides basic logging (to stdout) until .start() is called below
val mainbus = new MainBus(DebugMainBus) val mainbus = new MainBus(DebugMainBus)

View file

@ -13,6 +13,7 @@ import akka.event.ActorEventBus
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.remote.RemoteAddress
/** /**
* ActorRef is an immutable and serializable handle to an Actor. * ActorRef is an immutable and serializable handle to an Actor.
@ -286,7 +287,8 @@ trait ScalaActorRef { ref: ActorRef ⇒
case class SerializedActorRef(address: String, hostname: String, port: Int) { case class SerializedActorRef(address: String, hostname: String, port: Int) {
import akka.serialization.Serialization.app import akka.serialization.Serialization.app
def this(address: String, inet: InetSocketAddress) = this(address, inet.getAddress.getHostAddress, inet.getPort) def this(address: String, remoteAddress: RemoteAddress) = this(address, remoteAddress.hostname, remoteAddress.port)
def this(address: String, remoteAddress: InetSocketAddress) = this(address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) //TODO FIXME REMOVE
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = { def readResolve(): AnyRef = {

View file

@ -8,9 +8,29 @@ import akka.actor._
import akka.{ AkkaException, AkkaApplication } import akka.{ AkkaException, AkkaApplication }
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import java.io.{ PrintWriter, PrintStream }
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.io.{ PrintWriter, PrintStream }
object RemoteAddress {
def apply(host: String, port: Int): RemoteAddress = apply(new InetSocketAddress(host, port))
def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match {
case null null
case inet
val host = inet.getAddress.getHostAddress
val portNo = inet.getPort
new RemoteAddress {
def hostname = host
def port = portNo
}
}
}
trait RemoteAddress {
def hostname: String
def port: Int
override def toString = "" + hostname + ":" + port
}
class RemoteException(message: String) extends AkkaException(message) class RemoteException(message: String) extends AkkaException(message)
@ -27,35 +47,35 @@ sealed trait RemoteLifeCycleEvent
* Life-cycle events for RemoteClient. * Life-cycle events for RemoteClient.
*/ */
trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent {
def remoteAddress: InetSocketAddress def remoteAddress: RemoteAddress
} }
case class RemoteClientError( case class RemoteClientError(
@BeanProperty cause: Throwable, @BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected( case class RemoteClientDisconnected(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected( case class RemoteClientConnected(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted( case class RemoteClientStarted(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown( case class RemoteClientShutdown(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientWriteFailed( case class RemoteClientWriteFailed(
@BeanProperty request: AnyRef, @BeanProperty request: AnyRef,
@BeanProperty cause: Throwable, @BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent @BeanProperty remoteAddress: RemoteAddress) extends RemoteClientLifeCycleEvent
/** /**
* Life-cycle events for RemoteServer. * Life-cycle events for RemoteServer.
@ -71,18 +91,18 @@ case class RemoteServerError(
@BeanProperty remote: RemoteSupport) extends RemoteServerLifeCycleEvent @BeanProperty remote: RemoteSupport) extends RemoteServerLifeCycleEvent
case class RemoteServerClientConnected( case class RemoteServerClientConnected(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientDisconnected( case class RemoteServerClientDisconnected(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientClosed( case class RemoteServerClientClosed(
@BeanProperty remote: RemoteSupport, @BeanProperty remote: RemoteSupport,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent @BeanProperty val clientAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerWriteFailed( case class RemoteServerWriteFailed(
@BeanProperty request: AnyRef, @BeanProperty request: AnyRef,
@BeanProperty cause: Throwable, @BeanProperty cause: Throwable,
@BeanProperty server: RemoteSupport, @BeanProperty server: RemoteSupport,
@BeanProperty remoteAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent @BeanProperty remoteAddress: Option[RemoteAddress]) extends RemoteServerLifeCycleEvent
/** /**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
@ -90,7 +110,7 @@ case class RemoteServerWriteFailed(
class RemoteClientException private[akka] ( class RemoteClientException private[akka] (
message: String, message: String,
@BeanProperty val client: RemoteSupport, @BeanProperty val client: RemoteSupport,
val remoteAddress: InetSocketAddress, cause: Throwable = null) extends AkkaException(message, cause) val remoteAddress: RemoteAddress, cause: Throwable = null) extends AkkaException(message, cause)
/** /**
* Thrown when the remote server actor dispatching fails for some reason. * Thrown when the remote server actor dispatching fails for some reason.
@ -127,18 +147,18 @@ abstract class RemoteSupport(val app: AkkaApplication) {
/** /**
* Shuts down a specific client connected to the supplied remote address returns true if successful * Shuts down a specific client connected to the supplied remote address returns true if successful
*/ */
def shutdownClientConnection(address: InetSocketAddress): Boolean def shutdownClientConnection(address: RemoteAddress): Boolean
/** /**
* Restarts a specific client connected to the supplied remote address, but only if the client is not shut down * Restarts a specific client connected to the supplied remote address, but only if the client is not shut down
*/ */
def restartClientConnection(address: InetSocketAddress): Boolean def restartClientConnection(address: RemoteAddress): Boolean
/** Methods that needs to be implemented by a transport **/ /** Methods that needs to be implemented by a transport **/
protected[akka] def send(message: Any, protected[akka] def send(message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress, remoteAddress: RemoteAddress,
recipient: ActorRef, recipient: ActorRef,
loader: Option[ClassLoader]): Unit loader: Option[ClassLoader]): Unit

View file

@ -10,6 +10,7 @@ import scala.annotation.tailrec
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.remote.RemoteAddress
/** /**
* An Iterable that also contains a version. * An Iterable that also contains a version.
@ -71,12 +72,12 @@ trait ConnectionManager {
/** /**
* Creates a new connection (ActorRef) if it didn't exist. Atomically. * Creates a new connection (ActorRef) if it didn't exist. Atomically.
*/ */
def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ActorRef): ActorRef def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ActorRef): ActorRef
/** /**
* Fails over connections from one address to another. * Fails over connections from one address to another.
*/ */
def failOver(from: InetSocketAddress, to: InetSocketAddress) def failOver(from: RemoteAddress, to: RemoteAddress)
} }
/** /**
@ -120,9 +121,9 @@ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends Con
} }
} }
def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here def failOver(from: RemoteAddress, to: RemoteAddress) {} // do nothing here
def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ActorRef): ActorRef = { def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ActorRef): ActorRef = {
throw new UnsupportedOperationException("Not supported") throw new UnsupportedOperationException("Not supported")
} }
} }

View file

@ -1,29 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import java.net.InetSocketAddress
object RemoteAddress {
def apply(hostname: String, port: Int) = new RemoteAddress(hostname, port)
def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match {
case null null
case inet new RemoteAddress(inet.getAddress.getHostAddress, inet.getPort)
}
}
class RemoteAddress(val hostname: String, val port: Int) {
override val hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, hostname)
result = HashCode.hash(result, port)
result
}
override def equals(that: Any): Boolean = {
that.isInstanceOf[RemoteAddress] &&
that.asInstanceOf[RemoteAddress].hostname == hostname &&
that.asInstanceOf[RemoteAddress].port == port
}
}

View file

@ -4,7 +4,6 @@
package akka.remote package akka.remote
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import scala.collection.immutable.Map import scala.collection.immutable.Map
@ -42,9 +41,9 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
*/ */
private case class State( private case class State(
version: Long = 0L, version: Long = 0L,
failureStats: Map[InetSocketAddress, FailureStats] = Map.empty[InetSocketAddress, FailureStats], failureStats: Map[RemoteAddress, FailureStats] = Map.empty[RemoteAddress, FailureStats],
intervalHistory: Map[InetSocketAddress, Vector[Long]] = Map.empty[InetSocketAddress, Vector[Long]], intervalHistory: Map[RemoteAddress, Vector[Long]] = Map.empty[RemoteAddress, Vector[Long]],
timestamps: Map[InetSocketAddress, Long] = Map.empty[InetSocketAddress, Long]) timestamps: Map[RemoteAddress, Long] = Map.empty[RemoteAddress, Long])
private val state = new AtomicReference[State](State()) private val state = new AtomicReference[State](State())
@ -52,13 +51,13 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
* Returns true if the connection is considered to be up and healthy * Returns true if the connection is considered to be up and healthy
* and returns false otherwise. * and returns false otherwise.
*/ */
def isAvailable(connection: InetSocketAddress): Boolean = phi(connection) < threshold def isAvailable(connection: RemoteAddress): Boolean = phi(connection) < threshold
/** /**
* Records a heartbeat for a connection. * Records a heartbeat for a connection.
*/ */
@tailrec @tailrec
final def heartbeat(connection: InetSocketAddress) { final def heartbeat(connection: RemoteAddress) {
val oldState = state.get val oldState = state.get
val latestTimestamp = oldState.timestamps.get(connection) val latestTimestamp = oldState.timestamps.get(connection)
@ -139,7 +138,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
* Implementations of 'Cumulative Distribution Function' for Exponential Distribution. * Implementations of 'Cumulative Distribution Function' for Exponential Distribution.
* For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597]. * For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597].
*/ */
def phi(connection: InetSocketAddress): Double = { def phi(connection: RemoteAddress): Double = {
val oldState = state.get val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection) val oldTimestamp = oldState.timestamps.get(connection)
if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
@ -154,7 +153,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
* Removes the heartbeat management for a connection. * Removes the heartbeat management for a connection.
*/ */
@tailrec @tailrec
final def remove(connection: InetSocketAddress) { final def remove(connection: RemoteAddress) {
val oldState = state.get val oldState = state.get
if (oldState.failureStats.contains(connection)) { if (oldState.failureStats.contains(connection)) {

View file

@ -12,9 +12,7 @@ import akka.util.duration._
import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit
import java.security.SecureRandom import java.security.SecureRandom
import System.{ currentTimeMillis newTimestamp } import System.{ currentTimeMillis newTimestamp }
@ -27,8 +25,8 @@ import com.google.protobuf.ByteString
* Interface for node membership change listener. * Interface for node membership change listener.
*/ */
trait NodeMembershipChangeListener { trait NodeMembershipChangeListener {
def nodeConnected(node: InetSocketAddress) def nodeConnected(node: RemoteAddress)
def nodeDisconnected(node: InetSocketAddress) def nodeDisconnected(node: RemoteAddress)
} }
/** /**
@ -36,23 +34,23 @@ trait NodeMembershipChangeListener {
*/ */
case class Gossip( case class Gossip(
version: VectorClock, version: VectorClock,
node: InetSocketAddress, node: RemoteAddress,
availableNodes: Set[InetSocketAddress] = Set.empty[InetSocketAddress], availableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress],
unavailableNodes: Set[InetSocketAddress] = Set.empty[InetSocketAddress]) unavailableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress])
/* /*
// ====== NEW GOSSIP IMPLEMENTATION ====== // ====== NEW GOSSIP IMPLEMENTATION ======
case class Gossip( case class Gossip(
version: VectorClock, version: VectorClock,
node: InetSocketAddress, node: RemoteAddress,
leader: InetSocketAddress, // FIXME leader is always head of 'members', so we probably don't need this field leader: RemoteAddress, // FIXME leader is always head of 'members', so we probably don't need this field
members: SortedSet[Member] = SortetSet.empty[Member](Ordering.fromLessThan[String](_ > _)), // sorted set of members with their status, sorted by name members: SortedSet[Member] = SortetSet.empty[Member](Ordering.fromLessThan[String](_ > _)), // sorted set of members with their status, sorted by name
seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock], // for ring convergence seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock], // for ring convergence
pendingChanges: Option[Vector[PendingPartitioningChange]] = None, // for handoff pendingChanges: Option[Vector[PendingPartitioningChange]] = None, // for handoff
meta: Option[Map[String, Array[Byte]]] = None) // misc meta-data meta: Option[Map[String, Array[Byte]]] = None) // misc meta-data
case class Member(address: InetSocketAddress, status: MemberStatus) case class Member(address: RemoteAddress, status: MemberStatus)
sealed trait MemberStatus sealed trait MemberStatus
object MemberStatus { object MemberStatus {
@ -73,8 +71,8 @@ case class Gossip(
type VNodeMod = AnyRef type VNodeMod = AnyRef
case class PendingPartitioningChange( case class PendingPartitioningChange(
owner: InetSocketAddress, owner: RemoteAddress,
nextOwner: InetSocketAddress, nextOwner: RemoteAddress,
changes: Vector[VNodeMod], changes: Vector[VNodeMod],
status: PendingPartitioningStatus) status: PendingPartitioningStatus)
*/ */
@ -107,7 +105,7 @@ class Gossiper(remote: Remote) {
private val app = remote.app private val app = remote.app
private val log = Logging(app, this) private val log = Logging(app, this)
private val failureDetector = remote.failureDetector private val failureDetector = remote.failureDetector
private val connectionManager = new RemoteConnectionManager(app, remote, Map.empty[InetSocketAddress, ActorRef]) private val connectionManager = new RemoteConnectionManager(app, remote, Map.empty[RemoteAddress, ActorRef])
private val seeds = Set(address) // FIXME read in list of seeds from config private val seeds = Set(address) // FIXME read in list of seeds from config
private val scheduler = new DefaultScheduler private val scheduler = new DefaultScheduler
@ -231,7 +229,7 @@ class Gossiper(remote: Remote) {
/** /**
* Gossips set of nodes passed in as argument. Returns 'true' if it gossiped to a "seed" node. * Gossips set of nodes passed in as argument. Returns 'true' if it gossiped to a "seed" node.
*/ */
private def gossipTo(nodes: Set[InetSocketAddress]): Boolean = { private def gossipTo(nodes: Set[RemoteAddress]): Boolean = {
val peers = nodes filter (_ != address) // filter out myself val peers = nodes filter (_ != address) // filter out myself
val peer = selectRandomNode(peers) val peer = selectRandomNode(peers)
val oldState = state.get val oldState = state.get
@ -323,7 +321,7 @@ class Gossiper(remote: Remote) {
} }
} }
private def selectRandomNode(nodes: Set[InetSocketAddress]): InetSocketAddress = { private def selectRandomNode(nodes: Set[RemoteAddress]): RemoteAddress = {
nodes.toList(random.nextInt(nodes.size)) nodes.toList(random.nextInt(nodes.size))
} }
} }

View file

@ -4,9 +4,7 @@
package akka.remote package akka.remote
import akka.dispatch.PinnedDispatcher
import scala.collection.mutable import scala.collection.mutable
import java.net.InetSocketAddress
import akka.actor.{ LocalActorRef, Actor, ActorRef, Props, newUuid } import akka.actor.{ LocalActorRef, Actor, ActorRef, Props, newUuid }
import akka.actor.Actor._ import akka.actor.Actor._
import akka.AkkaApplication import akka.AkkaApplication
@ -21,10 +19,10 @@ object NetworkEventStream {
private sealed trait NetworkEventStreamEvent private sealed trait NetworkEventStreamEvent
private case class Register(listener: Listener, connectionAddress: InetSocketAddress) private case class Register(listener: Listener, connectionAddress: RemoteAddress)
extends NetworkEventStreamEvent extends NetworkEventStreamEvent
private case class Unregister(listener: Listener, connectionAddress: InetSocketAddress) private case class Unregister(listener: Listener, connectionAddress: RemoteAddress)
extends NetworkEventStreamEvent extends NetworkEventStreamEvent
/** /**
@ -39,8 +37,8 @@ object NetworkEventStream {
*/ */
private class Channel extends Actor { private class Channel extends Actor {
val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[Listener]]() { val listeners = new mutable.HashMap[RemoteAddress, mutable.Set[Listener]]() {
override def default(k: InetSocketAddress) = mutable.Set.empty[Listener] override def default(k: RemoteAddress) = mutable.Set.empty[Listener]
} }
def receive = { def receive = {
@ -72,12 +70,12 @@ class NetworkEventStream(val app: AkkaApplication) {
/** /**
* Registers a network event stream listener (asyncronously). * Registers a network event stream listener (asyncronously).
*/ */
def register(listener: Listener, connectionAddress: InetSocketAddress) = def register(listener: Listener, connectionAddress: RemoteAddress) =
sender ! Register(listener, connectionAddress) sender ! Register(listener, connectionAddress)
/** /**
* Unregisters a network event stream listener (asyncronously) . * Unregisters a network event stream listener (asyncronously) .
*/ */
def unregister(listener: Listener, connectionAddress: InetSocketAddress) = def unregister(listener: Listener, connectionAddress: RemoteAddress) =
sender ! Unregister(listener, connectionAddress) sender ! Unregister(listener, connectionAddress)
} }

View file

@ -111,9 +111,9 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass) case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
} }
val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a)
val inetAddr = new InetSocketAddress(a.hostname, a.port) val remoteAddress = RemoteAddress(a.hostname, a.port)
conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) conns + (remoteAddress -> RemoteActorRef(remote.server, remoteAddress, address, None))
} }
val connectionManager = new RemoteConnectionManager(app, remote, connections) val connectionManager = new RemoteConnectionManager(app, remote, connections)
@ -169,19 +169,19 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
} }
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
if (optimizeLocalScoped_? && (actor.hostname == app.hostname || actor.hostname == app.defaultAddress.getHostName) && actor.port == app.port) { val remoteAddress = RemoteAddress(actor.hostname, actor.port)
if (optimizeLocalScoped_? && remoteAddress == app.defaultAddress) {
local.actorFor(actor.address) local.actorFor(actor.address)
} else { } else {
val remoteInetSocketAddress = new InetSocketAddress(actor.hostname, actor.port) //FIXME Drop the InetSocketAddresses and use RemoteAddress log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.address, remoteAddress)
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", app.defaultAddress, actor.address, remoteInetSocketAddress) Some(RemoteActorRef(remote.server, remoteAddress, actor.address, None)) //Should it be None here
Some(RemoteActorRef(remote.server, remoteInetSocketAddress, actor.address, None)) //Should it be None here
} }
} }
/** /**
* Using (checking out) actor on a specific node. * Using (checking out) actor on a specific node.
*/ */
def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () Actor) { def useActorOnNode(remoteAddress: RemoteAddress, actorAddress: String, actorFactory: () Actor) {
log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorAddress, remoteAddress) log.debug("[{}] Instantiating Actor [{}] on node [{}]", app.defaultAddress, actorAddress, remoteAddress)
val actorFactoryBytes = val actorFactoryBytes =
@ -244,7 +244,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
*/ */
private[akka] case class RemoteActorRef private[akka] ( private[akka] case class RemoteActorRef private[akka] (
remote: RemoteSupport, remote: RemoteSupport,
remoteAddress: InetSocketAddress, remoteAddress: RemoteAddress,
address: String, address: String,
loader: Option[ClassLoader]) loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef {

View file

@ -12,7 +12,6 @@ import akka.event.Logging
import scala.collection.immutable.Map import scala.collection.immutable.Map
import scala.annotation.tailrec import scala.annotation.tailrec
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
/** /**
@ -23,13 +22,13 @@ import java.util.concurrent.atomic.AtomicReference
class RemoteConnectionManager( class RemoteConnectionManager(
app: AkkaApplication, app: AkkaApplication,
remote: Remote, remote: Remote,
initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef])
extends ConnectionManager { extends ConnectionManager {
val log = Logging(app, this) val log = Logging(app, this)
// FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc.
case class State(version: Long, connections: Map[InetSocketAddress, ActorRef]) case class State(version: Long, connections: Map[RemoteAddress, ActorRef])
extends VersionedIterable[ActorRef] { extends VersionedIterable[ActorRef] {
def iterable: Iterable[ActorRef] = connections.values def iterable: Iterable[ActorRef] = connections.values
} }
@ -55,7 +54,7 @@ class RemoteConnectionManager(
def size: Int = connections.connections.size def size: Int = connections.connections.size
def connectionFor(address: InetSocketAddress): Option[ActorRef] = connections.connections.get(address) def connectionFor(address: RemoteAddress): Option[ActorRef] = connections.connections.get(address)
def isEmpty: Boolean = connections.connections.isEmpty def isEmpty: Boolean = connections.connections.isEmpty
@ -64,7 +63,7 @@ class RemoteConnectionManager(
} }
@tailrec @tailrec
final def failOver(from: InetSocketAddress, to: InetSocketAddress) { final def failOver(from: RemoteAddress, to: RemoteAddress) {
log.debug("Failing over connection from [{}] to [{}]", from, to) log.debug("Failing over connection from [{}] to [{}]", from, to)
val oldState = state.get val oldState = state.get
@ -95,8 +94,8 @@ class RemoteConnectionManager(
val oldState = state.get() val oldState = state.get()
var changed = false var changed = false
var faultyAddress: InetSocketAddress = null var faultyAddress: RemoteAddress = null
var newConnections = Map.empty[InetSocketAddress, ActorRef] var newConnections = Map.empty[RemoteAddress, ActorRef]
oldState.connections.keys foreach { address oldState.connections.keys foreach { address
val actorRef: ActorRef = oldState.connections.get(address).get val actorRef: ActorRef = oldState.connections.get(address).get
@ -122,7 +121,7 @@ class RemoteConnectionManager(
} }
@tailrec @tailrec
final def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ActorRef): ActorRef = { final def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ActorRef): ActorRef = {
val oldState = state.get() val oldState = state.get()
val oldConnections = oldState.connections val oldConnections = oldState.connections
@ -149,7 +148,7 @@ class RemoteConnectionManager(
} }
} }
private[remote] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { private[remote] def newConnection(actorAddress: String, inetSocketAddress: RemoteAddress) = {
RemoteActorRef(remote.server, inetSocketAddress, actorAddress, None) RemoteActorRef(remote.server, inetSocketAddress, actorAddress, None)
} }
} }

View file

@ -36,13 +36,11 @@ class RemoteClientMessageBufferException(message: String, cause: Throwable = nul
*/ */
abstract class RemoteClient private[akka] ( abstract class RemoteClient private[akka] (
val remoteSupport: NettyRemoteSupport, val remoteSupport: NettyRemoteSupport,
val remoteAddress: InetSocketAddress) { val remoteAddress: RemoteAddress) {
val log = Logging(remoteSupport.app, this) val log = Logging(remoteSupport.app, this)
val name = simpleName(this) + "@" + val name = simpleName(this) + "@" + remoteAddress
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
private[remote] val runSwitch = new Switch() private[remote] val runSwitch = new Switch()
@ -54,18 +52,13 @@ abstract class RemoteClient private[akka] (
def shutdown(): Boolean def shutdown(): Boolean
def isBoundTo(address: InetSocketAddress): Boolean = currentChannel.getRemoteAddress match { def isBoundTo(address: RemoteAddress): Boolean = remoteAddress == address
case remoteAddress: InetSocketAddress
address.getAddress.getHostAddress == remoteAddress.getAddress.getHostAddress && address.getPort == remoteAddress.getPort
case _ false
}
/** /**
* Converts the message to the wireprotocol and sends the message across the wire * Converts the message to the wireprotocol and sends the message across the wire
*/ */
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef) { def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit =
send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build) send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
}
/** /**
* Sends the message across the wire * Sends the message across the wire
@ -101,8 +94,7 @@ abstract class RemoteClient private[akka] (
class PassiveRemoteClient(val currentChannel: Channel, class PassiveRemoteClient(val currentChannel: Channel,
remoteSupport: NettyRemoteSupport, remoteSupport: NettyRemoteSupport,
remoteAddress: InetSocketAddress, remoteAddress: RemoteAddress)
val loader: Option[ClassLoader] = None)
extends RemoteClient(remoteSupport, remoteAddress) { extends RemoteClient(remoteSupport, remoteAddress) {
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn {
@ -125,7 +117,7 @@ class PassiveRemoteClient(val currentChannel: Channel,
*/ */
class ActiveRemoteClient private[akka] ( class ActiveRemoteClient private[akka] (
remoteSupport: NettyRemoteSupport, remoteSupport: NettyRemoteSupport,
remoteAddress: InetSocketAddress, remoteAddress: RemoteAddress,
val loader: Option[ClassLoader] = None) val loader: Option[ClassLoader] = None)
extends RemoteClient(remoteSupport, remoteAddress) { extends RemoteClient(remoteSupport, remoteAddress) {
@ -167,7 +159,7 @@ class ActiveRemoteClient private[akka] (
def attemptReconnect(): Boolean = { def attemptReconnect(): Boolean = {
log.debug("Remote client reconnecting to [{}]", remoteAddress) log.debug("Remote client reconnecting to [{}]", remoteAddress)
val connection = bootstrap.connect(remoteAddress) val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port))
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) { if (!connection.isSuccess) {
@ -190,7 +182,7 @@ class ActiveRemoteClient private[akka] (
log.debug("Starting remote client connection to [{}]", remoteAddress) log.debug("Starting remote client connection to [{}]", remoteAddress)
connection = bootstrap.connect(remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port))
val channel = connection.awaitUninterruptibly.getChannel val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel) openChannels.add(channel)
@ -253,7 +245,7 @@ class ActiveRemoteClient private[akka] (
class ActiveRemoteClientPipelineFactory( class ActiveRemoteClientPipelineFactory(
name: String, name: String,
bootstrap: ClientBootstrap, bootstrap: ClientBootstrap,
remoteAddress: InetSocketAddress, remoteAddress: RemoteAddress,
timer: HashedWheelTimer, timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory { client: ActiveRemoteClient) extends ChannelPipelineFactory {
@ -278,7 +270,7 @@ class ActiveRemoteClientPipelineFactory(
class ActiveRemoteClientHandler( class ActiveRemoteClientHandler(
val name: String, val name: String,
val bootstrap: ClientBootstrap, val bootstrap: ClientBootstrap,
val remoteAddress: InetSocketAddress, val remoteAddress: RemoteAddress,
val timer: HashedWheelTimer, val timer: HashedWheelTimer,
val client: ActiveRemoteClient) val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler { extends SimpleChannelUpstreamHandler {
@ -366,26 +358,25 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with
protected[akka] def send(message: Any, protected[akka] def send(message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
recipientAddress: InetSocketAddress, recipientAddress: RemoteAddress,
recipient: ActorRef, recipient: ActorRef,
loader: Option[ClassLoader]): Unit = { loader: Option[ClassLoader]): Unit = {
val key = RemoteAddress(recipientAddress)
clientsLock.readLock.lock clientsLock.readLock.lock
try { try {
val client = remoteClients.get(key) match { val client = remoteClients.get(recipientAddress) match {
case Some(client) client case Some(client) client
case None case None
clientsLock.readLock.unlock clientsLock.readLock.unlock
clientsLock.writeLock.lock //Lock upgrade, not supported natively clientsLock.writeLock.lock //Lock upgrade, not supported natively
try { try {
try { try {
remoteClients.get(key) match { remoteClients.get(recipientAddress) match {
//Recheck for addition, race between upgrades //Recheck for addition, race between upgrades
case Some(client) client //If already populated by other writer case Some(client) client //If already populated by other writer
case None //Populate map case None //Populate map
val client = new ActiveRemoteClient(this, recipientAddress, loader) val client = new ActiveRemoteClient(this, recipientAddress, loader)
client.connect() client.connect()
remoteClients += key -> client remoteClients += recipientAddress -> client
client client
} }
} finally { } finally {
@ -401,31 +392,30 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with
} }
} }
def bindClient(inetAddress: InetSocketAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = clientsLock withWriteGuard { def bindClient(remoteAddress: RemoteAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = clientsLock withWriteGuard {
val address = RemoteAddress(inetAddress) if (putIfAbsent && remoteClients.contains(remoteAddress)) false
if (putIfAbsent && remoteClients.contains(address)) false
else { else {
client.connect() client.connect()
remoteClients.put(address, client).foreach(_.shutdown()) remoteClients.put(remoteAddress, client).foreach(_.shutdown())
true true
} }
} }
def unbindClient(inetAddress: InetSocketAddress): Unit = clientsLock withWriteGuard { def unbindClient(remoteAddress: RemoteAddress): Unit = clientsLock withWriteGuard {
remoteClients.foreach { remoteClients.foreach {
case (k, v) if (v.isBoundTo(inetAddress)) { v.shutdown(); remoteClients.remove(k) } case (k, v) if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) }
} }
} }
def shutdownClientConnection(address: InetSocketAddress): Boolean = clientsLock withWriteGuard { def shutdownClientConnection(remoteAddress: RemoteAddress): Boolean = clientsLock withWriteGuard {
remoteClients.remove(RemoteAddress(address)) match { remoteClients.remove(remoteAddress) match {
case Some(client) client.shutdown() case Some(client) client.shutdown()
case None false case None false
} }
} }
def restartClientConnection(address: InetSocketAddress): Boolean = clientsLock withReadGuard { def restartClientConnection(remoteAddress: RemoteAddress): Boolean = clientsLock withReadGuard {
remoteClients.get(RemoteAddress(address)) match { remoteClients.get(remoteAddress) match {
case Some(client) client.connect(reconnectIfAlreadyConnected = true) case Some(client) client.connect(reconnectIfAlreadyConnected = true)
case None false case None false
} }
@ -469,8 +459,9 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with
class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) { class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) {
val log = Logging(remoteSupport.app, this) val log = Logging(remoteSupport.app, this)
import remoteSupport.serverSettings._ import remoteSupport.serverSettings._
import remoteSupport.app.defaultAddress
val name = "NettyRemoteServer@" + remoteSupport.app.hostname + ":" + remoteSupport.app.port val name = "NettyRemoteServer@" + defaultAddress
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
@ -487,7 +478,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT.toMillis) bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT.toMillis)
openChannels.add(bootstrap.bind(remoteSupport.app.defaultAddress)) openChannels.add(bootstrap.bind(new InetSocketAddress(defaultAddress.hostname, defaultAddress.port)))
remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport))
def shutdown() { def shutdown() {
@ -620,8 +611,8 @@ class RemoteServerHandler(
instruction.getCommandType match { instruction.getCommandType match {
case CommandType.CONNECT if USE_PASSIVE_CONNECTIONS case CommandType.CONNECT if USE_PASSIVE_CONNECTIONS
val origin = instruction.getOrigin val origin = instruction.getOrigin
val inbound = new InetSocketAddress(origin.getHostname, origin.getPort) val inbound = RemoteAddress(origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound, applicationLoader) val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
remoteSupport.bindClient(inbound, client) remoteSupport.bindClient(inbound, client)
case CommandType.SHUTDOWN //TODO FIXME Dispose passive connection here case CommandType.SHUTDOWN //TODO FIXME Dispose passive connection here
case _ //Unknown command case _ //Unknown command
@ -637,9 +628,9 @@ class RemoteServerHandler(
event.getChannel.close event.getChannel.close
} }
private def getClientAddress(c: Channel): Option[InetSocketAddress] = private def getClientAddress(c: Channel): Option[RemoteAddress] =
c.getRemoteAddress match { c.getRemoteAddress match {
case inet: InetSocketAddress Some(inet) case inet: InetSocketAddress Some(RemoteAddress(inet))
case _ None case _ None
} }
} }

View file

@ -8,10 +8,10 @@ import java.net.InetSocketAddress
class AccrualFailureDetectorSpec extends WordSpec with MustMatchers { class AccrualFailureDetectorSpec extends WordSpec with MustMatchers {
"An AccrualFailureDetector" should { "An AccrualFailureDetector" should {
val conn = RemoteAddress(new InetSocketAddress("localhost", 2552))
"mark node as available after a series of successful heartbeats" in { "mark node as available after a series of successful heartbeats" in {
val fd = new AccrualFailureDetector val fd = new AccrualFailureDetector
val conn = new InetSocketAddress("localhost", 2552)
fd.heartbeat(conn) fd.heartbeat(conn)
@ -27,7 +27,6 @@ class AccrualFailureDetectorSpec extends WordSpec with MustMatchers {
// FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector
"mark node as dead after explicit removal of connection" ignore { "mark node as dead after explicit removal of connection" ignore {
val fd = new AccrualFailureDetector val fd = new AccrualFailureDetector
val conn = new InetSocketAddress("localhost", 2552)
fd.heartbeat(conn) fd.heartbeat(conn)
@ -46,7 +45,6 @@ class AccrualFailureDetectorSpec extends WordSpec with MustMatchers {
"mark node as dead if heartbeat are missed" in { "mark node as dead if heartbeat are missed" in {
val fd = new AccrualFailureDetector(threshold = 3) val fd = new AccrualFailureDetector(threshold = 3)
val conn = new InetSocketAddress("localhost", 2552)
fd.heartbeat(conn) fd.heartbeat(conn)
@ -65,7 +63,6 @@ class AccrualFailureDetectorSpec extends WordSpec with MustMatchers {
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val fd = new AccrualFailureDetector(threshold = 3) val fd = new AccrualFailureDetector(threshold = 3)
val conn = new InetSocketAddress("localhost", 2552)
fd.heartbeat(conn) fd.heartbeat(conn)