2011-08-29 12:02:12 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
import akka.actor.{ Actor, ActorRef, Props }
|
2011-08-29 14:26:24 +02:00
|
|
|
import Actor._
|
2011-08-29 12:02:12 +02:00
|
|
|
import akka.cluster._
|
2011-08-30 14:31:59 +02:00
|
|
|
import akka.routing._
|
|
|
|
|
import akka.event.EventHandler
|
2011-08-29 14:26:24 +02:00
|
|
|
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
|
2011-08-29 12:02:12 +02:00
|
|
|
import akka.util.ListenerManagement
|
|
|
|
|
|
|
|
|
|
import scala.collection.mutable.{ HashMap, Set }
|
2011-08-30 14:31:59 +02:00
|
|
|
import scala.annotation.tailrec
|
2011-08-29 12:02:12 +02:00
|
|
|
|
|
|
|
|
import java.net.InetSocketAddress
|
2011-08-30 14:31:59 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
2011-08-29 12:02:12 +02:00
|
|
|
|
|
|
|
|
object FailureDetector {
|
|
|
|
|
|
|
|
|
|
private sealed trait FailureDetectorEvent
|
2011-08-30 14:31:59 +02:00
|
|
|
private case class Register(strategy: RemoteFailureListener, address: InetSocketAddress) extends FailureDetectorEvent
|
|
|
|
|
private case class Unregister(strategy: RemoteFailureListener, address: InetSocketAddress) extends FailureDetectorEvent
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-29 14:26:24 +02:00
|
|
|
private[akka] val registry = actorOf(Props(new Registry).copy(dispatcher = new PinnedDispatcher(), localOnly = true))
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def register(strategy: RemoteFailureListener, address: InetSocketAddress) = registry ! Register(strategy, address)
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def unregister(strategy: RemoteFailureListener, address: InetSocketAddress) = registry ! Unregister(strategy, address)
|
2011-08-29 12:02:12 +02:00
|
|
|
|
|
|
|
|
private class Registry extends Actor {
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
val strategies = new HashMap[InetSocketAddress, Set[RemoteFailureListener]]() {
|
|
|
|
|
override def default(k: InetSocketAddress) = Set.empty[RemoteFailureListener]
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case event: RemoteClientLifeCycleEvent ⇒
|
|
|
|
|
strategies(event.remoteAddress) foreach (_ notify event)
|
|
|
|
|
|
|
|
|
|
case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent
|
|
|
|
|
|
|
|
|
|
case Register(strategy, address) ⇒
|
|
|
|
|
strategies(address) += strategy
|
|
|
|
|
|
|
|
|
|
case Unregister(strategy, address) ⇒
|
|
|
|
|
strategies(address) -= strategy
|
|
|
|
|
|
|
|
|
|
case _ ⇒ //ignore other
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-08-30 14:31:59 +02:00
|
|
|
}
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector {
|
|
|
|
|
import ClusterActorRef._
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
// type C
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
private val state = new AtomicReference[State]()
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
state.set(new State(Long.MinValue, initialConnections))
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def version: Long = state.get().version
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def versionedIterable = state.get
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def size: Int = state.get.iterable.size
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def connections: Map[InetSocketAddress, ActorRef] = state.get.connections
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def stopAll() {
|
|
|
|
|
state.get().connections.values foreach (_.stop()) // shut down all remote connections
|
|
|
|
|
}
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
@tailrec
|
|
|
|
|
final def failOver(from: InetSocketAddress, to: InetSocketAddress) {
|
|
|
|
|
EventHandler.debug(this, "ClusterActorRef failover from [%s] to [%s]".format(from, to))
|
|
|
|
|
|
|
|
|
|
val oldState = state.get
|
|
|
|
|
var changed = false
|
|
|
|
|
val newMap = oldState.connections map {
|
|
|
|
|
case (`from`, actorRef) ⇒
|
|
|
|
|
changed = true
|
|
|
|
|
//actorRef.stop()
|
|
|
|
|
(to, createRemoteActorRef(actorRef.address, to))
|
|
|
|
|
case other ⇒ other
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
if (changed) {
|
|
|
|
|
//there was a state change, so we are now going to update the state.
|
|
|
|
|
val newState = new State(oldState.version + 1, newMap)
|
|
|
|
|
|
|
|
|
|
//if we are not able to update, the state, we are going to try again.
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) failOver(from, to)
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
2011-08-30 14:31:59 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
|
|
|
|
final def remove(deadRef: ActorRef) {
|
|
|
|
|
EventHandler.debug(this, "ClusterActorRef remove [%s]".format(deadRef.uuid))
|
|
|
|
|
|
|
|
|
|
val oldState = state.get()
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
var changed = false
|
|
|
|
|
|
|
|
|
|
//remote the deadRef from the clustered-connections.
|
|
|
|
|
var newConnections = Map.empty[InetSocketAddress, ActorRef]
|
|
|
|
|
oldState.connections.keys foreach { address ⇒
|
|
|
|
|
val actorRef: ActorRef = oldState.connections.get(address).get
|
|
|
|
|
if (actorRef ne deadRef) {
|
|
|
|
|
newConnections = newConnections + ((address, actorRef))
|
|
|
|
|
} else {
|
|
|
|
|
changed = true
|
|
|
|
|
}
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
if (changed) {
|
|
|
|
|
//one or more occurrances of the actorRef were removed, so we need to update the state.
|
|
|
|
|
val newState = new State(oldState.version + 1, newConnections)
|
|
|
|
|
|
|
|
|
|
//if we are not able to update the state, we just try again.
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) remove(deadRef)
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
2011-08-30 14:31:59 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class State(version: Long = Integer.MIN_VALUE,
|
|
|
|
|
val connections: Map[InetSocketAddress, ActorRef])
|
|
|
|
|
extends VersionedIterable[ActorRef](version, connections.values)
|
2011-08-29 12:02:12 +02:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
// class State[C](version: Long = Integer.MIN_VALUE,
|
|
|
|
|
// val connections: Map[InetSocketAddress, ActorRef],
|
|
|
|
|
// val explicitConnections: Iterable[ActorRef],
|
|
|
|
|
// val context: C)
|
|
|
|
|
// extends VersionedIterable[ActorRef](version, explicitConnections ++ connections.values)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait RemoteFailureListener {
|
|
|
|
|
|
|
|
|
|
def notify(event: RemoteLifeCycleEvent) = event match {
|
|
|
|
|
case RemoteClientWriteFailed(request, cause, client, address) ⇒
|
|
|
|
|
remoteClientWriteFailed(request, cause, client, address)
|
|
|
|
|
println("--------->>> RemoteClientWriteFailed")
|
|
|
|
|
case RemoteClientError(cause, client, address) ⇒
|
|
|
|
|
println("--------->>> RemoteClientError")
|
|
|
|
|
remoteClientError(cause, client, address)
|
|
|
|
|
case RemoteClientDisconnected(client, address) ⇒
|
|
|
|
|
remoteClientDisconnected(client, address)
|
|
|
|
|
println("--------->>> RemoteClientDisconnected")
|
|
|
|
|
case RemoteClientShutdown(client, address) ⇒
|
|
|
|
|
remoteClientShutdown(client, address)
|
|
|
|
|
println("--------->>> RemoteClientShutdown")
|
|
|
|
|
case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒
|
|
|
|
|
remoteServerWriteFailed(request, cause, server, clientAddress)
|
|
|
|
|
case RemoteServerError(cause, server) ⇒
|
|
|
|
|
remoteServerError(cause, server)
|
|
|
|
|
case RemoteServerShutdown(server) ⇒
|
|
|
|
|
remoteServerShutdown(server)
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def remoteClientWriteFailed(
|
|
|
|
|
request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {}
|
|
|
|
|
|
|
|
|
|
def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {}
|
|
|
|
|
|
|
|
|
|
def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {}
|
|
|
|
|
|
|
|
|
|
def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {}
|
|
|
|
|
|
|
|
|
|
def remoteServerWriteFailed(
|
|
|
|
|
request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {}
|
|
|
|
|
|
|
|
|
|
def remoteServerError(cause: Throwable, server: RemoteServerModule) {}
|
|
|
|
|
|
|
|
|
|
def remoteServerShutdown(server: RemoteServerModule) {}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class RemoveConnectionOnFirstFailureFailureDetector(initialConnections: Map[InetSocketAddress, ActorRef])
|
|
|
|
|
extends FailureDetectorBase(initialConnections)
|
|
|
|
|
with RemoteFailureListener {
|
|
|
|
|
|
|
|
|
|
override def remoteClientWriteFailed(
|
|
|
|
|
request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {
|
|
|
|
|
removeConnection(address)
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
override def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {
|
|
|
|
|
removeConnection(address)
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
override def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {
|
|
|
|
|
removeConnection(address)
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
2011-08-30 14:31:59 +02:00
|
|
|
|
|
|
|
|
override def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {
|
|
|
|
|
removeConnection(address)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def removeConnection(address: InetSocketAddress) =
|
|
|
|
|
connections.get(address) foreach { connection ⇒ remove(connection) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait LinearBackoffRemoteFailureListener extends RemoteFailureListener {
|
2011-08-29 12:02:12 +02:00
|
|
|
}
|
2011-08-30 14:31:59 +02:00
|
|
|
|
|
|
|
|
trait ExponentialBackoffRemoteFailureListener extends RemoteFailureListener {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait CircuitBreakerRemoteFailureListener extends RemoteFailureListener {
|
|
|
|
|
}
|