diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index 4c0251ad10..9d435c06e1 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -10,6 +10,8 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.Map import scala.annotation.tailrec +import System.{ currentTimeMillis ⇒ newTimestamp } + /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf] @@ -22,7 +24,7 @@ import scala.annotation.tailrec */ class AccrualFailureDetector( val threshold: Int = 8, - val maxSampleSize: Int = 1000) extends FailureDetector { + val maxSampleSize: Int = 1000) { final val PhiFactor = 1.0 / math.log(10.0) @@ -163,8 +165,4 @@ class AccrualFailureDetector( if (!state.compareAndSet(oldState, newState)) remove(connection) // recur } } - - def recordSuccess(connection: InetSocketAddress, timestamp: Long) {} - def recordFailure(connection: InetSocketAddress, timestamp: Long) {} - def notify(event: RemoteLifeCycleEvent) {} } diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala deleted file mode 100644 index 07ec2fb95a..0000000000 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.remote - -import akka.AkkaException -import akka.actor._ -import akka.event.EventHandler -import akka.config.ConfigurationException -import akka.actor.UntypedChannel._ -import akka.dispatch.Future -import akka.util.ReflectiveAccess -import akka.util.Duration - -import java.net.InetSocketAddress -import java.lang.reflect.InvocationTargetException -import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } - -import scala.collection.immutable.Map -import scala.collection.mutable -import scala.annotation.tailrec - -/** - * The failure detector uses different heuristics (depending on implementation) to try to detect and manage - * failed connections. - * - * @author Jonas Bonér - */ -trait FailureDetector extends NetworkEventStream.Listener { - - def newTimestamp: Long = System.currentTimeMillis - - /** - * Returns true if the 'connection' is considered available. - */ - def isAvailable(connection: InetSocketAddress): Boolean - - /** - * Records a successful connection. - */ - def recordSuccess(connection: InetSocketAddress, timestamp: Long) - - /** - * Records a failed connection. - */ - def recordFailure(connection: InetSocketAddress, timestamp: Long) -} - -/** - * Misc helper and factory methods for failure detection. - */ -object FailureDetector { - - def createCustomFailureDetector(implClass: String): FailureDetector = { - - ReflectiveAccess.createInstance( - implClass, - Array[Class[_]](), - Array[AnyRef]()) match { - case Right(failureDetector) ⇒ failureDetector - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new ConfigurationException( - "Could not instantiate custom FailureDetector of [" + - implClass + "] due to: " + - cause, cause) - } - } -} - -/** - * No-op failure detector. Does not do anything. - */ -class NoOpFailureDetector extends FailureDetector { - - def isAvailable(connection: InetSocketAddress): Boolean = true - - def recordSuccess(connection: InetSocketAddress, timestamp: Long) {} - - def recordFailure(connection: InetSocketAddress, timestamp: Long) {} - - def notify(event: RemoteLifeCycleEvent) {} -} - -/** - * Simple failure detector that removes the failing connection permanently on first error. - */ -class RemoveConnectionOnFirstFailureFailureDetector extends FailureDetector { - - protected case class State(version: Long, banned: Set[InetSocketAddress]) - - protected val state: AtomicReference[State] = new AtomicReference[State](newState()) - - protected def newState() = State(Long.MinValue, Set.empty[InetSocketAddress]) - - def isAvailable(connectionAddress: InetSocketAddress): Boolean = state.get.banned.contains(connectionAddress) - - final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} - - @tailrec - final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) { - val oldState = state.get - if (!oldState.banned.contains(connectionAddress)) { - val newBannedConnections = oldState.banned + connectionAddress - val newState = oldState copy (version = oldState.version + 1, banned = newBannedConnections) - if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp) - } - } - - // NetworkEventStream.Listener callback - def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientError(cause, client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientDisconnected(client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientShutdown(client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case _ ⇒ {} - } -} - -/** - * Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection - * again after the ban period have expired. - * - * @author Jonas Bonér - */ -class BannagePeriodFailureDetector(timeToBan: Duration) extends FailureDetector with NetworkEventStream.Listener { - - // FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired - - protected case class State(version: Long, banned: Map[InetSocketAddress, BannedConnection]) - - protected val state: AtomicReference[State] = new AtomicReference[State](newState()) - - case class BannedConnection(bannedSince: Long, address: InetSocketAddress) - - val timeToBanInMillis = timeToBan.toMillis - - protected def newState() = State(Long.MinValue, Map.empty[InetSocketAddress, BannedConnection]) - - private def bannedConnections = state.get.banned - - def isAvailable(connectionAddress: InetSocketAddress): Boolean = bannedConnections.get(connectionAddress).isEmpty - - @tailrec - final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) { - val oldState = state.get - val bannedConnection = oldState.banned.get(connectionAddress) - - if (bannedConnection.isDefined) { // is it banned or not? - val BannedConnection(bannedSince, banned) = bannedConnection.get - val currentlyBannedFor = newTimestamp - bannedSince - - if (currentlyBannedFor > timeToBanInMillis) { - val newBannedConnections = oldState.banned - connectionAddress - - val newState = oldState copy (version = oldState.version + 1, banned = newBannedConnections) - - if (!state.compareAndSet(oldState, newState)) recordSuccess(connectionAddress, timestamp) - } - } - } - - @tailrec - final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) { - val oldState = state.get - val connection = oldState.banned.get(connectionAddress) - - if (connection.isEmpty) { // is it already banned or not? - val bannedConnection = BannedConnection(timestamp, connectionAddress) - val newBannedConnections = oldState.banned + (connectionAddress -> bannedConnection) - - val newState = oldState copy (version = oldState.version + 1, banned = newBannedConnections) - - if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp) - } - } - - // NetworkEventStream.Listener callback - def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientStarted(client, connectionAddress) ⇒ - recordSuccess(connectionAddress, newTimestamp) - - case RemoteClientConnected(client, connectionAddress) ⇒ - recordSuccess(connectionAddress, newTimestamp) - - case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientError(cause, client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientDisconnected(client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientShutdown(client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case _ ⇒ {} - } -} - -/** - * Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections. - * - * class CircuitBreakerNetworkEventStream.Listener(initialConnections: Map[InetSocketAddress, ActorRef]) - * extends RemoteConnectionManager(initialConnections) { - * - * def newState() = State(Long.MinValue, initialConnections, None) - * - * def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined - * - * def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} - * - * def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} - * - * // FIXME implement CircuitBreakerNetworkEventStream.Listener - * } - */ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index eb1f3b82d6..f8837ec4f4 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -35,9 +35,6 @@ class RemoteConnectionManager( private val state: AtomicReference[State] = new AtomicReference[State](newState()) - // register all initial connections - e.g listen to events from them - initialConnections.keys foreach (remote.eventStream.register(failureDetector, _)) - /** * This method is using the FailureDetector to filter out connections that are considered not available. */ @@ -117,7 +114,6 @@ class RemoteConnectionManager( remove(faultyConnection) // recur } else { app.eventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) - remote.eventStream.unregister(failureDetector, faultyAddress) // unregister the connections - e.g stop listen to events from it } } } @@ -145,7 +141,6 @@ class RemoteConnectionManager( } else { // we succeeded app.eventHandler.debug(this, "Adding connection [%s]".format(address)) - remote.eventStream.register(failureDetector, address) // register the connection - e.g listen to events from it newConnection // return new connection actor } }