From 1663bf4ac14327e25cec68f9acfa3d42ef6cdbc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 8 Sep 2011 19:48:11 +0200 Subject: [PATCH] Rewrote and abstracted remote failure detection and added BannagePeriodFailureDetector. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../akka/cluster/RemoteFailureDetector.scala | 323 +++++++++++++----- 2 files changed, 247 insertions(+), 78 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 7d87333f29..6bdf285e58 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -428,7 +428,7 @@ class DefaultClusterNode private[akka] ( remoteService.shutdown() // shutdown server - RemoteFailureDetector.registry.stop() + RemoteFailureDetector.channel.stop() remoteClientLifeCycleHandler.stop() remoteDaemon.stop() diff --git a/akka-cluster/src/main/scala/akka/cluster/RemoteFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/RemoteFailureDetector.scala index 84bc6a4d16..634f683c61 100644 --- a/akka-cluster/src/main/scala/akka/cluster/RemoteFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/RemoteFailureDetector.scala @@ -10,30 +10,41 @@ import akka.cluster._ import akka.routing._ import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } -import akka.util.ListenerManagement +import akka.util.{ ListenerManagement, Duration } -import scala.collection.mutable.{ HashMap, Set } +import scala.collection.immutable.Map +import scala.collection.mutable import scala.annotation.tailrec import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference +import System.{ currentTimeMillis ⇒ newTimestamp } +/** + * Holds error event channel Actor instance and provides API for channel listener management. + */ object RemoteFailureDetector { private sealed trait RemoteFailureDetectorChannelEvent - private case class Register(listener: RemoteFailureListener, address: InetSocketAddress) extends RemoteFailureDetectorChannelEvent - private case class Unregister(listener: RemoteFailureListener, address: InetSocketAddress) extends RemoteFailureDetectorChannelEvent + + private case class Register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) + extends RemoteFailureDetectorChannelEvent + + private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) + extends RemoteFailureDetectorChannelEvent private[akka] val channel = actorOf(Props(new Channel).copy(dispatcher = new PinnedDispatcher(), localOnly = true)) - def register(listener: RemoteFailureListener, address: InetSocketAddress) = channel ! Register(listener, address) + def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = + channel ! Register(listener, connectionAddress) - def unregister(listener: RemoteFailureListener, address: InetSocketAddress) = channel ! Unregister(listener, address) + def unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = + channel ! Unregister(listener, connectionAddress) private class Channel extends Actor { - val listeners = new HashMap[InetSocketAddress, Set[RemoteFailureListener]]() { - override def default(k: InetSocketAddress) = Set.empty[RemoteFailureListener] + val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[RemoteFailureListener]]() { + override def default(k: InetSocketAddress) = mutable.Set.empty[RemoteFailureListener] } def receive = { @@ -42,30 +53,67 @@ object RemoteFailureDetector { case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent - case Register(listener, address) ⇒ - listeners(address) += listener + case Register(listener, connectionAddress) ⇒ + listeners(connectionAddress) += listener - case Unregister(listener, address) ⇒ - listeners(address) -= listener + case Unregister(listener, connectionAddress) ⇒ + listeners(connectionAddress) -= listener case _ ⇒ //ignore other } } } -abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector { +/** + * Base class for remote failure detection management. + */ +abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) + extends FailureDetector + with RemoteFailureListener { + import ClusterActorRef._ - case class State(val version: Long = Integer.MIN_VALUE, val connections: Map[InetSocketAddress, ActorRef]) + type T <: AnyRef + + protected case class State( + version: Long, + connections: Map[InetSocketAddress, ActorRef], + meta: T = null.asInstanceOf[T]) extends VersionedIterable[ActorRef] { def iterable: Iterable[ActorRef] = connections.values } - // type C + protected val state: AtomicReference[State] = { + val ref = new AtomicReference[State] + ref set newState() + ref + } - private val state = new AtomicReference[State]() + /** + * State factory. To be defined by subclass that wants to add extra info in the 'meta: Option[T]' field. + */ + protected def newState(): State - state.set(State(Long.MinValue, initialConnections)) + /** + * Returns true if the 'connection' is considered available. + * + * To be implemented by subclass. + */ + def isAvailable(connectionAddress: InetSocketAddress): Boolean + + /** + * Records a successful connection. + * + * To be implemented by subclass. + */ + def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) + + /** + * Records a failed connection. + * + * To be implemented by subclass. + */ + def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) def version: Long = state.get.version @@ -85,6 +133,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre val oldState = state.get var changed = false + val newMap = oldState.connections map { case (`from`, actorRef) ⇒ changed = true @@ -95,7 +144,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre if (changed) { //there was a state change, so we are now going to update the state. - val newState = State(oldState.version + 1, newMap) + val newState = oldState copy (version = oldState.version + 1, connections = newMap) //if we are not able to update, the state, we are going to try again. if (!state.compareAndSet(oldState, newState)) failOver(from, to) @@ -107,7 +156,6 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre EventHandler.debug(this, "ClusterActorRef remove [%s]".format(faultyConnection.uuid)) val oldState = state.get() - var changed = false //remote the faultyConnection from the clustered-connections. @@ -123,7 +171,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre if (changed) { //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = State(oldState.version + 1, newConnections) + val newState = oldState copy (version = oldState.version + 1, connections = newConnections) //if we are not able to update the state, we just try again. if (!state.compareAndSet(oldState, newState)) remove(faultyConnection) @@ -131,24 +179,181 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre } } +/** + * Simple failure detector that removes the failing connection permanently on first error. + */ +class RemoveConnectionOnFirstFailureRemoteFailureDetector( + initialConnections: Map[InetSocketAddress, ActorRef]) + extends RemoteFailureDetectorBase(initialConnections) { + + protected def newState() = State(Long.MinValue, initialConnections) + + def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined + + def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} + + def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} + + override def remoteClientWriteFailed( + request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { + removeConnection(connectionAddress) + } + + override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { + removeConnection(connectionAddress) + } + + override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) { + removeConnection(connectionAddress) + } + + override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) { + removeConnection(connectionAddress) + } + + private def removeConnection(connectionAddress: InetSocketAddress) = + connections.get(connectionAddress) foreach { conn ⇒ remove(conn) } +} + +/** + * Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection + * again after the ban period have expired. + */ +class BannagePeriodFailureDetector( + initialConnections: Map[InetSocketAddress, ActorRef], + timeToBan: Duration) + extends RemoteFailureDetectorBase(initialConnections) { + + type T = Map[InetSocketAddress, BannedConnection] + + case class BannedConnection(bannedSince: Long, connection: ActorRef) + + val timeToBanInMillis = timeToBan.toMillis + + protected def newState() = + State(Long.MinValue, initialConnections, Map.empty[InetSocketAddress, BannedConnection]) + + private def removeConnection(connectionAddress: InetSocketAddress) = + connections.get(connectionAddress) foreach { conn ⇒ remove(conn) } + + // =================================================================================== + // FailureDetector callbacks + // =================================================================================== + + def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined + + @tailrec + final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) { + val oldState = state.get + val bannedConnection = oldState.meta.get(connectionAddress) + + if (bannedConnection.isDefined) { + val BannedConnection(bannedSince, connection) = bannedConnection.get + val currentlyBannedFor = newTimestamp - bannedSince + + if (currentlyBannedFor > timeToBanInMillis) { + // ban time has expired - add connection to available connections + val newConnections = oldState.connections + (connectionAddress -> connection) + val newBannedConnections = oldState.meta - connectionAddress + + val newState = oldState copy (version = oldState.version + 1, + connections = newConnections, + meta = newBannedConnections) + + if (!state.compareAndSet(oldState, newState)) recordSuccess(connectionAddress, timestamp) + } + } + } + + @tailrec + final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) { + val oldState = state.get + val connection = oldState.connections.get(connectionAddress) + + if (connection.isDefined) { + val newConnections = oldState.connections - connectionAddress + val bannedConnection = BannedConnection(timestamp, connection.get) + val newBannedConnections = oldState.meta + (connectionAddress -> bannedConnection) + + val newState = oldState copy (version = oldState.version + 1, + connections = newConnections, + meta = newBannedConnections) + + if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp) + } + } + + // =================================================================================== + // RemoteFailureListener callbacks + // =================================================================================== + + override def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) { + recordSuccess(connectionAddress, newTimestamp) + } + + override def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) { + recordSuccess(connectionAddress, newTimestamp) + } + + override def remoteClientWriteFailed( + request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { + recordFailure(connectionAddress, newTimestamp) + } + + override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { + recordFailure(connectionAddress, newTimestamp) + } + + override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) { + recordFailure(connectionAddress, newTimestamp) + } + + override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) { + recordFailure(connectionAddress, newTimestamp) + } +} + +/** + * Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections. + * + * class CircuitBreakerRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef]) + * extends RemoteFailureDetectorBase(initialConnections) { + * + * def newState() = State(Long.MinValue, initialConnections, None) + * + * def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined + * + * def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} + * + * def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} + * + * // FIXME implement CircuitBreakerRemoteFailureListener + * } + */ + +/** + * Base trait for remote failure event listener. + */ trait RemoteFailureListener { - final def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientWriteFailed(request, cause, client, address) ⇒ - remoteClientWriteFailed(request, cause, client, address) - println("--------->>> RemoteClientWriteFailed") + final private[akka] def notify(event: RemoteLifeCycleEvent) = event match { + case RemoteClientStarted(client, connectionAddress) ⇒ + remoteClientStarted(client, connectionAddress) - case RemoteClientError(cause, client, address) ⇒ - println("--------->>> RemoteClientError") - remoteClientError(cause, client, address) + case RemoteClientConnected(client, connectionAddress) ⇒ + remoteClientConnected(client, connectionAddress) - case RemoteClientDisconnected(client, address) ⇒ - remoteClientDisconnected(client, address) - println("--------->>> RemoteClientDisconnected") + case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ + remoteClientWriteFailed(request, cause, client, connectionAddress) - case RemoteClientShutdown(client, address) ⇒ - remoteClientShutdown(client, address) - println("--------->>> RemoteClientShutdown") + case RemoteClientError(cause, client, connectionAddress) ⇒ + remoteClientError(cause, client, connectionAddress) + + case RemoteClientDisconnected(client, connectionAddress) ⇒ + remoteClientDisconnected(client, connectionAddress) + + case RemoteClientShutdown(client, connectionAddress) ⇒ + remoteClientShutdown(client, connectionAddress) case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒ remoteServerWriteFailed(request, cause, server, clientAddress) @@ -160,14 +365,18 @@ trait RemoteFailureListener { remoteServerShutdown(server) } + def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} + + def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} + def remoteClientWriteFailed( - request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {} + request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {} + def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {} + def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {} + def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} def remoteServerWriteFailed( request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {} @@ -176,43 +385,3 @@ trait RemoteFailureListener { def remoteServerShutdown(server: RemoteServerModule) {} } - -class RemoveConnectionOnFirstFailureRemoteFailureDetector(initialConnections: Map[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(initialConnections) - with RemoteFailureListener { - - override def remoteClientWriteFailed( - request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - override def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - override def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - override def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - private def removeConnection(address: InetSocketAddress) = - connections.get(address) foreach { connection ⇒ remove(connection) } -} - -trait LinearBackoffRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(initialConnections) - with RemoteFailureListener { -} - -trait ExponentialBackoffRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(initialConnections) - with RemoteFailureListener { -} - -trait CircuitBreakerRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(initialConnections) - with RemoteFailureListener { -} \ No newline at end of file