Moved FailureDetector trait and utility companion object to its own file.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-09-08 19:46:05 +02:00
parent 72e0c60909
commit 47bfafe81e
2 changed files with 141 additions and 86 deletions

View file

@ -0,0 +1,127 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.AkkaException
import akka.actor._
import akka.event.EventHandler
import akka.config.ConfigurationException
import akka.actor.UntypedChannel._
import akka.dispatch.{ Future, Futures }
import akka.util.ReflectiveAccess
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import scala.annotation.tailrec
/**
* Misc helper and factory methods for failure detection.
*/
object FailureDetector {
def createCustomFailureDetector(
implClass: String,
connections: Map[InetSocketAddress, ActorRef]): FailureDetector = {
ReflectiveAccess.createInstance(
implClass,
Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]),
Array[AnyRef](connections)) match {
case Right(actor) actor
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new ConfigurationException(
"Could not instantiate custom FailureDetector of [" +
implClass + "] due to: " +
cause, cause)
}
}
}
/**
* The FailureDetector acts like a middleman between the Router and
* the actor reference that does the routing and can dectect and act upon failure.
*
* Through the FailureDetector:
* <ol>
* <li>
* the actor ref can signal that something has changed in the known set of connections. The Router can see
* when a changed happened (by checking the version) and update its internal datastructures.
* </li>
* <li>
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
* </li>
* </ol>
*/
trait FailureDetector {
/**
* Returns true if the 'connection' is considered available.
*/
def isAvailable(connection: InetSocketAddress): Boolean
/**
* Records a successful connection.
*/
def recordSuccess(connection: InetSocketAddress, timestamp: Long)
/**
* Records a failed connection.
*/
def recordFailure(connection: InetSocketAddress, timestamp: Long)
/**
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
* able to update its internal datastructures.
*/
def version: Long
/**
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
* with an atomic read of and size and version.
*/
def size: Int
/**
* Stops all managed actors
*/
def stopAll()
/**
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
* the time element, also the version is included to be able to read the data (the connections) and the version
* in an atomic manner.
*
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
* view of some set of connections.
*/
def versionedIterable: VersionedIterable[ActorRef]
/**
* A callback that can be used to indicate that a connected actorRef was dead.
* <p/>
* Implementations should make sure that this method can be called without the actorRef being part of the
* current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the
* reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that
* not working.
*
* It could be that even after a remove has been called for a specific ActorRef, that the ActorRef
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
* made how long this takes.
*
* @param ref the dead
*/
def remove(deadRef: ActorRef)
/**
* Fails over connections from one address to another.
*/
def failOver(from: InetSocketAddress, to: InetSocketAddress)
}

View file

@ -70,89 +70,6 @@ trait VersionedIterable[A] {
*/
class RoutingException(message: String) extends AkkaException(message)
/**
* Misc helper and factory methods for failure detection.
*/
object FailureDetector {
def createCustomFailureDetector(implClass: String, connections: Map[InetSocketAddress, ActorRef]): FailureDetector = {
ReflectiveAccess.createInstance(implClass, Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]), Array[AnyRef](connections)) match {
case Right(actor) actor
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new ConfigurationException("Could not instantiate custom FailureDetector of [" + implClass + "] due to: " + cause, cause)
}
}
}
/**
* The FailureDetector acts like a middleman between the Router and the actor reference that does the routing
* and can dectect and act upon failur.
*
* Through the FailureDetector:
* <ol>
* <li>
* the actor ref can signal that something has changed in the known set of connections. The Router can see
* when a changed happened (by checking the version) and update its internal datastructures.
* </li>
* <li>
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
* </li>
* </ol>
*/
trait FailureDetector {
/**
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
* able to update its internal datastructures.
*/
def version: Long
/**
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
* with an atomic read of and size and version.
*/
def size: Int
/**
* Stops all managed actors
*/
def stopAll()
/**
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
* the time element, also the version is included to be able to read the data (the connections) and the version
* in an atomic manner.
*
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
* view of some set of connections.
*/
def versionedIterable: VersionedIterable[ActorRef]
/**
* A callback that can be used to indicate that a connected actorRef was dead.
* <p/>
* Implementations should make sure that this method can be called without the actorRef being part of the
* current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the
* reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that
* not working.
*
* It could be that even after a remove has been called for a specific ActorRef, that the ActorRef
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
* made how long this takes.
*
* @param ref the dead
*/
def remove(deadRef: ActorRef)
/**
* Fails over connections from one address to another.
*/
def failOver(from: InetSocketAddress, to: InetSocketAddress)
}
/**
* Default "local" failure detector. This failure detector removes an actor from the
* router if an exception occured in the router's thread (e.g. when trying to add
@ -160,7 +77,7 @@ trait FailureDetector {
*/
class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector {
case class State(val version: Long, val iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
case class State(version: Long, iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
private val state = new AtomicReference[State]
@ -169,6 +86,13 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector
state.set(State(Long.MinValue, connectionIterable))
}
def isAvailable(connection: InetSocketAddress): Boolean =
state.get.iterable.find(c connection == c).isDefined
def recordSuccess(connection: InetSocketAddress, timestamp: Long) {}
def recordFailure(connection: InetSocketAddress, timestamp: Long) {}
def version: Long = state.get.version
def size: Int = state.get.iterable.size
@ -328,6 +252,8 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends Abstrac
/**
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* FIXME: this is also the location where message buffering should be done in case of failure.
*/
trait BasicRouter extends Router {
@ -343,7 +269,7 @@ trait BasicRouter extends Router {
//it is a broadcast message, we are going to send to message to all connections.
connections.versionedIterable.iterable.foreach(actor
try {
actor.!(message)(sender)
actor.!(message)(sender) // we use original sender, so this is essentially a 'forward'
} catch {
case e: Exception
connections.remove(actor)
@ -354,7 +280,7 @@ trait BasicRouter extends Router {
next match {
case Some(actor)
try {
actor.!(message)(sender)
actor.!(message)(sender) // we use original sender, so this is essentially a 'forward'
} catch {
case e: Exception
connections.remove(actor)
@ -373,6 +299,7 @@ trait BasicRouter extends Router {
next match {
case Some(actor)
try {
// FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
} catch {
case e: Exception
@ -407,6 +334,7 @@ class DirectRouter extends BasicRouter {
if (currentState.ref == null) None else Some(currentState.ref)
}
// FIXME rename all 'getState' methods to 'currentState', non-scala
@tailrec
private def getState: DirectRouterState = {
val currentState = state.get