Refactored state management in routing fail over.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-08-30 18:42:35 +02:00
parent eb2cf56467
commit e17a376236
3 changed files with 38 additions and 33 deletions

View file

@ -55,8 +55,12 @@ trait Router {
/**
* An Iterable that also contains a version.
*/
class VersionedIterable[A](val version: Long, val iterable: Iterable[A]) {
def apply() = iterable
trait VersionedIterable[A] {
val version: Long
def iterable: Iterable[A]
def apply(): Iterable[A] = iterable
}
/**
@ -137,11 +141,13 @@ trait FailureDetector {
*/
class LocalFailureDetector extends FailureDetector {
private val state = new AtomicReference[VersionedIterable[ActorRef]]
case class State(val version: Long, val iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
private val state = new AtomicReference[State]
def this(connectionIterable: Iterable[ActorRef]) = {
this()
state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable))
state.set(State(Long.MinValue, connectionIterable))
}
def version: Long = state.get.version
@ -166,7 +172,7 @@ class LocalFailureDetector extends FailureDetector {
if (newList.size != oldState.iterable.size) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList)
val newState = State(oldState.version + 1, newList)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) remove(ref)
}
@ -196,9 +202,11 @@ object Routing {
if (!localOnly && !clusteringEnabled)
throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled")
else if (clusteringEnabled && !props.localOnly) {
else if (clusteringEnabled && !props.localOnly)
ReflectiveAccess.ClusterModule.newClusteredActorRef(props).start()
} else {
else {
if (props.connections.isEmpty)
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
@ -220,12 +228,16 @@ object Routing {
val router = routerType match {
case RouterType.Direct if connections.size > 1
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
case RouterType.Direct
new DirectRouter
case RouterType.Random
new RandomRouter
case RouterType.RoundRobin
new RoundRobinRouter
case r
throw new IllegalArgumentException("Unsupported routerType " + r)
}
@ -274,8 +286,7 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
private[akka] class RoutedActorRef(val routedProps: RoutedProps)
extends AbstractRoutedActorRef(routedProps) {
private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) {
router.init(new LocalFailureDetector(routedProps.connections))
@ -405,8 +416,7 @@ class DirectRouter extends BasicRouter {
}
}
private case class DirectRouterState(val ref: ActorRef, val version: Long)
private case class DirectRouterState(ref: ActorRef, version: Long)
}
/**
@ -546,5 +556,4 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results)
}

View file

@ -46,8 +46,10 @@ object ClusterActorRef {
val failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) FailureDetector = failureDetectorType match {
case RemoveConnectionOnFirstFailure
(connections: Map[InetSocketAddress, ActorRef]) new RemoveConnectionOnFirstFailureFailureDetector(connections)
case _
case Local
(connections: Map[InetSocketAddress, ActorRef]) new LocalFailureDetector
case _
(connections: Map[InetSocketAddress, ActorRef]) new RemoveConnectionOnFirstFailureFailureDetector(connections)
}
new ClusterActorRef(

View file

@ -56,22 +56,26 @@ object FailureDetector {
abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector {
import ClusterActorRef._
case class State(val version: Long = Integer.MIN_VALUE, val connections: Map[InetSocketAddress, ActorRef]) extends VersionedIterable[ActorRef] {
def iterable: Iterable[ActorRef] = connections.values
}
// type C
private val state = new AtomicReference[State]()
state.set(new State(Long.MinValue, initialConnections))
state.set(State(Long.MinValue, initialConnections))
def version: Long = state.get().version
def versionedIterable = state.get
def size: Int = state.get.iterable.size
def size: Int = state.get.connections.size
def connections: Map[InetSocketAddress, ActorRef] = state.get.connections
def stopAll() {
state.get().connections.values foreach (_.stop()) // shut down all remote connections
state.get().iterable foreach (_.stop()) // shut down all remote connections
}
@tailrec
@ -90,7 +94,7 @@ abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, Ac
if (changed) {
//there was a state change, so we are now going to update the state.
val newState = new State(oldState.version + 1, newMap)
val newState = State(oldState.version + 1, newMap)
//if we are not able to update, the state, we are going to try again.
if (!state.compareAndSet(oldState, newState)) failOver(from, to)
@ -98,18 +102,18 @@ abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, Ac
}
@tailrec
final def remove(deadRef: ActorRef) {
EventHandler.debug(this, "ClusterActorRef remove [%s]".format(deadRef.uuid))
final def remove(faultyConnection: ActorRef) {
EventHandler.debug(this, "ClusterActorRef remove [%s]".format(faultyConnection.uuid))
val oldState = state.get()
var changed = false
//remote the deadRef from the clustered-connections.
//remote the faultyConnection from the clustered-connections.
var newConnections = Map.empty[InetSocketAddress, ActorRef]
oldState.connections.keys foreach { address
val actorRef: ActorRef = oldState.connections.get(address).get
if (actorRef ne deadRef) {
if (actorRef ne faultyConnection) {
newConnections = newConnections + ((address, actorRef))
} else {
changed = true
@ -118,22 +122,12 @@ abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, Ac
if (changed) {
//one or more occurrances of the actorRef were removed, so we need to update the state.
val newState = new State(oldState.version + 1, newConnections)
val newState = State(oldState.version + 1, newConnections)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) remove(deadRef)
if (!state.compareAndSet(oldState, newState)) remove(faultyConnection)
}
}
class State(version: Long = Integer.MIN_VALUE,
val connections: Map[InetSocketAddress, ActorRef])
extends VersionedIterable[ActorRef](version, connections.values)
// class State[C](version: Long = Integer.MIN_VALUE,
// val connections: Map[InetSocketAddress, ActorRef],
// val explicitConnections: Iterable[ActorRef],
// val context: C)
// extends VersionedIterable[ActorRef](version, explicitConnections ++ connections.values)
}
trait RemoteFailureListener {