2011-10-07 15:42:55 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.remote
|
|
|
|
|
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import akka.routing._
|
2011-11-10 20:08:00 +01:00
|
|
|
import akka.actor.ActorSystem
|
2011-10-27 12:23:01 +02:00
|
|
|
import akka.event.Logging
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
import scala.collection.immutable.Map
|
|
|
|
|
import scala.annotation.tailrec
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remote connection manager, manages remote connections, e.g. RemoteActorRef's.
|
|
|
|
|
*/
|
|
|
|
|
class RemoteConnectionManager(
|
2011-12-11 20:00:26 +01:00
|
|
|
system: ActorSystemImpl,
|
2011-10-13 13:41:44 +02:00
|
|
|
remote: Remote,
|
2011-12-11 20:00:26 +01:00
|
|
|
initialConnections: Map[ParsedTransportAddress, ActorRef] = Map.empty[ParsedTransportAddress, ActorRef])
|
2011-10-07 15:42:55 +02:00
|
|
|
extends ConnectionManager {
|
|
|
|
|
|
2011-11-18 11:59:43 +01:00
|
|
|
val log = Logging(system, "RemoteConnectionManager")
|
2011-10-27 12:23:01 +02:00
|
|
|
|
2011-10-19 12:25:16 +02:00
|
|
|
// FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc.
|
2011-12-11 20:00:26 +01:00
|
|
|
case class State(version: Long, connections: Map[ParsedTransportAddress, ActorRef])
|
2011-10-07 15:42:55 +02:00
|
|
|
extends VersionedIterable[ActorRef] {
|
|
|
|
|
def iterable: Iterable[ActorRef] = connections.values
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-28 23:11:35 +02:00
|
|
|
def failureDetector = remote.failureDetector
|
2011-10-19 12:25:16 +02:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
private val state: AtomicReference[State] = new AtomicReference[State](newState())
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This method is using the FailureDetector to filter out connections that are considered not available.
|
|
|
|
|
*/
|
|
|
|
|
private def filterAvailableConnections(current: State): State = {
|
|
|
|
|
val availableConnections = current.connections filter { entry ⇒ failureDetector.isAvailable(entry._1) }
|
|
|
|
|
current copy (version = current.version, connections = availableConnections)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def newState() = State(Long.MinValue, initialConnections)
|
|
|
|
|
|
|
|
|
|
def version: Long = state.get.version
|
|
|
|
|
|
2011-10-19 12:25:16 +02:00
|
|
|
// FIXME should not return State value but a Seq with connections
|
2011-10-07 15:42:55 +02:00
|
|
|
def connections = filterAvailableConnections(state.get)
|
|
|
|
|
|
|
|
|
|
def size: Int = connections.connections.size
|
|
|
|
|
|
2011-12-11 20:00:26 +01:00
|
|
|
def connectionFor(address: ParsedTransportAddress): Option[ActorRef] = connections.connections.get(address)
|
2011-10-19 12:25:16 +02:00
|
|
|
|
2011-10-18 11:26:35 +02:00
|
|
|
def isEmpty: Boolean = connections.connections.isEmpty
|
|
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
def shutdown() {
|
|
|
|
|
state.get.iterable foreach (_.stop()) // shut down all remote connections
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
2011-12-11 20:00:26 +01:00
|
|
|
final def failOver(from: ParsedTransportAddress, to: ParsedTransportAddress) {
|
2011-10-27 12:23:01 +02:00
|
|
|
log.debug("Failing over connection from [{}] to [{}]", from, to)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
val oldState = state.get
|
|
|
|
|
var changed = false
|
|
|
|
|
|
|
|
|
|
val newMap = oldState.connections map {
|
|
|
|
|
case (`from`, actorRef) ⇒
|
|
|
|
|
changed = true
|
|
|
|
|
//actorRef.stop()
|
2011-11-08 11:56:46 +01:00
|
|
|
(to, newConnection(to, actorRef.path))
|
2011-10-07 15:42:55 +02:00
|
|
|
case other ⇒ other
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (changed) {
|
|
|
|
|
//there was a state change, so we are now going to update the state.
|
|
|
|
|
val newState = oldState copy (version = oldState.version + 1, connections = newMap)
|
|
|
|
|
|
|
|
|
|
//if we are not able to update, the state, we are going to try again.
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) {
|
|
|
|
|
failOver(from, to) // recur
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
|
|
|
|
final def remove(faultyConnection: ActorRef) {
|
|
|
|
|
|
|
|
|
|
val oldState = state.get()
|
|
|
|
|
var changed = false
|
|
|
|
|
|
2011-12-11 20:00:26 +01:00
|
|
|
var faultyAddress: ParsedTransportAddress = null
|
|
|
|
|
var newConnections = Map.empty[ParsedTransportAddress, ActorRef]
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
oldState.connections.keys foreach { address ⇒
|
|
|
|
|
val actorRef: ActorRef = oldState.connections.get(address).get
|
|
|
|
|
if (actorRef ne faultyConnection) {
|
|
|
|
|
newConnections = newConnections + ((address, actorRef))
|
|
|
|
|
} else {
|
|
|
|
|
faultyAddress = address
|
|
|
|
|
changed = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (changed) {
|
|
|
|
|
//one or more occurrances of the actorRef were removed, so we need to update the state.
|
|
|
|
|
val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
|
|
|
|
|
|
|
|
|
|
//if we are not able to update the state, we just try again.
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) {
|
|
|
|
|
remove(faultyConnection) // recur
|
|
|
|
|
} else {
|
2011-10-27 12:23:01 +02:00
|
|
|
log.debug("Removing connection [{}]", faultyAddress)
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
2011-12-11 20:00:26 +01:00
|
|
|
final def putIfAbsent(address: ParsedTransportAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = {
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
val oldState = state.get()
|
|
|
|
|
val oldConnections = oldState.connections
|
|
|
|
|
|
|
|
|
|
oldConnections.get(address) match {
|
|
|
|
|
case Some(connection) ⇒ connection // we already had the connection, return it
|
|
|
|
|
case None ⇒ // we need to create it
|
|
|
|
|
val newConnection = newConnectionFactory()
|
|
|
|
|
val newConnections = oldConnections + (address -> newConnection)
|
|
|
|
|
|
|
|
|
|
//one or more occurrances of the actorRef were removed, so we need to update the state.
|
|
|
|
|
val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
|
|
|
|
|
|
|
|
|
|
//if we are not able to update the state, we just try again.
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) {
|
|
|
|
|
// we failed, need compensating action
|
|
|
|
|
newConnection.stop() // stop the new connection actor and try again
|
|
|
|
|
putIfAbsent(address, newConnectionFactory) // recur
|
|
|
|
|
} else {
|
|
|
|
|
// we succeeded
|
2011-10-27 12:23:01 +02:00
|
|
|
log.debug("Adding connection [{}]", address)
|
2011-10-07 15:42:55 +02:00
|
|
|
newConnection // return new connection actor
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-11 20:00:26 +01:00
|
|
|
private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) =
|
2011-12-12 15:58:23 +01:00
|
|
|
new RemoteActorRef(remote.provider, remote.server, actorPath, Nobody, None)
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|