diff --git a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala new file mode 100644 index 0000000000..daf6f29067 --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala @@ -0,0 +1,127 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.routing + +import akka.AkkaException +import akka.actor._ +import akka.event.EventHandler +import akka.config.ConfigurationException +import akka.actor.UntypedChannel._ +import akka.dispatch.{ Future, Futures } +import akka.util.ReflectiveAccess + +import java.net.InetSocketAddress +import java.lang.reflect.InvocationTargetException +import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } + +import scala.annotation.tailrec + +/** + * Misc helper and factory methods for failure detection. + */ +object FailureDetector { + + def createCustomFailureDetector( + implClass: String, + connections: Map[InetSocketAddress, ActorRef]): FailureDetector = { + + ReflectiveAccess.createInstance( + implClass, + Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]), + Array[AnyRef](connections)) match { + case Right(actor) ⇒ actor + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + throw new ConfigurationException( + "Could not instantiate custom FailureDetector of [" + + implClass + "] due to: " + + cause, cause) + } + } +} + +/** + * The FailureDetector acts like a middleman between the Router and + * the actor reference that does the routing and can dectect and act upon failure. + * + * Through the FailureDetector: + *
    + *
  1. + * the actor ref can signal that something has changed in the known set of connections. The Router can see + * when a changed happened (by checking the version) and update its internal datastructures. + *
  2. + *
  3. + * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying. + *
  4. + *
+ */ +trait FailureDetector { + + /** + * Returns true if the 'connection' is considered available. + */ + def isAvailable(connection: InetSocketAddress): Boolean + + /** + * Records a successful connection. + */ + def recordSuccess(connection: InetSocketAddress, timestamp: Long) + + /** + * Records a failed connection. + */ + def recordFailure(connection: InetSocketAddress, timestamp: Long) + + /** + * A version that is useful to see if there is any change in the connections. If there is a change, a router is + * able to update its internal datastructures. + */ + def version: Long + + /** + * Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily) + * with an atomic read of and size and version. + */ + def size: Int + + /** + * Stops all managed actors + */ + def stopAll() + + /** + * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is + * the time element, also the version is included to be able to read the data (the connections) and the version + * in an atomic manner. + * + * This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable) + * view of some set of connections. + */ + def versionedIterable: VersionedIterable[ActorRef] + + /** + * A callback that can be used to indicate that a connected actorRef was dead. + *

+ * Implementations should make sure that this method can be called without the actorRef being part of the + * current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the + * reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that + * not working. + * + * It could be that even after a remove has been called for a specific ActorRef, that the ActorRef + * is still being used. A good behaving Router will eventually discard this reference, but no guarantees are + * made how long this takes. + * + * @param ref the dead + */ + def remove(deadRef: ActorRef) + + /** + * Fails over connections from one address to another. + */ + def failOver(from: InetSocketAddress, to: InetSocketAddress) +} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b2a51dab46..95344fe157 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -70,89 +70,6 @@ trait VersionedIterable[A] { */ class RoutingException(message: String) extends AkkaException(message) -/** - * Misc helper and factory methods for failure detection. - */ -object FailureDetector { - - def createCustomFailureDetector(implClass: String, connections: Map[InetSocketAddress, ActorRef]): FailureDetector = { - ReflectiveAccess.createInstance(implClass, Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]), Array[AnyRef](connections)) match { - case Right(actor) ⇒ actor - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new ConfigurationException("Could not instantiate custom FailureDetector of [" + implClass + "] due to: " + cause, cause) - } - } -} -/** - * The FailureDetector acts like a middleman between the Router and the actor reference that does the routing - * and can dectect and act upon failur. - * - * Through the FailureDetector: - *

    - *
  1. - * the actor ref can signal that something has changed in the known set of connections. The Router can see - * when a changed happened (by checking the version) and update its internal datastructures. - *
  2. - *
  3. - * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying. - *
  4. - *
- */ -trait FailureDetector { - - /** - * A version that is useful to see if there is any change in the connections. If there is a change, a router is - * able to update its internal datastructures. - */ - def version: Long - - /** - * Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily) - * with an atomic read of and size and version. - */ - def size: Int - - /** - * Stops all managed actors - */ - def stopAll() - - /** - * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is - * the time element, also the version is included to be able to read the data (the connections) and the version - * in an atomic manner. - * - * This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable) - * view of some set of connections. - */ - def versionedIterable: VersionedIterable[ActorRef] - - /** - * A callback that can be used to indicate that a connected actorRef was dead. - *

- * Implementations should make sure that this method can be called without the actorRef being part of the - * current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the - * reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that - * not working. - * - * It could be that even after a remove has been called for a specific ActorRef, that the ActorRef - * is still being used. A good behaving Router will eventually discard this reference, but no guarantees are - * made how long this takes. - * - * @param ref the dead - */ - def remove(deadRef: ActorRef) - - /** - * Fails over connections from one address to another. - */ - def failOver(from: InetSocketAddress, to: InetSocketAddress) -} - /** * Default "local" failure detector. This failure detector removes an actor from the * router if an exception occured in the router's thread (e.g. when trying to add @@ -160,7 +77,7 @@ trait FailureDetector { */ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector { - case class State(val version: Long, val iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef] + case class State(version: Long, iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef] private val state = new AtomicReference[State] @@ -169,6 +86,13 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector state.set(State(Long.MinValue, connectionIterable)) } + def isAvailable(connection: InetSocketAddress): Boolean = + state.get.iterable.find(c ⇒ connection == c).isDefined + + def recordSuccess(connection: InetSocketAddress, timestamp: Long) {} + + def recordFailure(connection: InetSocketAddress, timestamp: Long) {} + def version: Long = state.get.version def size: Int = state.get.iterable.size @@ -328,6 +252,8 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends Abstrac /** * An Abstract Router implementation that already provides the basic infrastructure so that a concrete * Router only needs to implement the next method. + * + * FIXME: this is also the location where message buffering should be done in case of failure. */ trait BasicRouter extends Router { @@ -343,7 +269,7 @@ trait BasicRouter extends Router { //it is a broadcast message, we are going to send to message to all connections. connections.versionedIterable.iterable.foreach(actor ⇒ try { - actor.!(message)(sender) + actor.!(message)(sender) // we use original sender, so this is essentially a 'forward' } catch { case e: Exception ⇒ connections.remove(actor) @@ -354,7 +280,7 @@ trait BasicRouter extends Router { next match { case Some(actor) ⇒ try { - actor.!(message)(sender) + actor.!(message)(sender) // we use original sender, so this is essentially a 'forward' } catch { case e: Exception ⇒ connections.remove(actor) @@ -373,6 +299,7 @@ trait BasicRouter extends Router { next match { case Some(actor) ⇒ try { + // FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef actor.?(message, timeout)(sender).asInstanceOf[Future[T]] } catch { case e: Exception ⇒ @@ -407,6 +334,7 @@ class DirectRouter extends BasicRouter { if (currentState.ref == null) None else Some(currentState.ref) } + // FIXME rename all 'getState' methods to 'currentState', non-scala @tailrec private def getState: DirectRouterState = { val currentState = state.get