diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 6efbc2b803..b60fdc578b 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -55,8 +55,12 @@ trait Router { /** * An Iterable that also contains a version. */ -class VersionedIterable[A](val version: Long, val iterable: Iterable[A]) { - def apply() = iterable +trait VersionedIterable[A] { + val version: Long + + def iterable: Iterable[A] + + def apply(): Iterable[A] = iterable } /** @@ -137,11 +141,13 @@ trait FailureDetector { */ class LocalFailureDetector extends FailureDetector { - private val state = new AtomicReference[VersionedIterable[ActorRef]] + case class State(val version: Long, val iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef] + + private val state = new AtomicReference[State] def this(connectionIterable: Iterable[ActorRef]) = { this() - state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable)) + state.set(State(Long.MinValue, connectionIterable)) } def version: Long = state.get.version @@ -166,7 +172,7 @@ class LocalFailureDetector extends FailureDetector { if (newList.size != oldState.iterable.size) { //one or more occurrences of the actorRef were removed, so we need to update the state. - val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList) + val newState = State(oldState.version + 1, newList) //if we are not able to update the state, we just try again. if (!state.compareAndSet(oldState, newState)) remove(ref) } @@ -196,9 +202,11 @@ object Routing { if (!localOnly && !clusteringEnabled) throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled") - else if (clusteringEnabled && !props.localOnly) { + + else if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props).start() - } else { + + else { if (props.connections.isEmpty) throw new IllegalArgumentException("A routed actorRef can't have an empty connection set") @@ -220,12 +228,16 @@ object Routing { val router = routerType match { case RouterType.Direct if connections.size > 1 ⇒ throw new IllegalArgumentException("A direct router can't have more than 1 connection") + case RouterType.Direct ⇒ new DirectRouter + case RouterType.Random ⇒ new RandomRouter + case RouterType.RoundRobin ⇒ new RoundRobinRouter + case r ⇒ throw new IllegalArgumentException("Unsupported routerType " + r) } @@ -274,8 +286,7 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to * on (or more) of these actors. */ -private[akka] class RoutedActorRef(val routedProps: RoutedProps) - extends AbstractRoutedActorRef(routedProps) { +private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) { router.init(new LocalFailureDetector(routedProps.connections)) @@ -405,8 +416,7 @@ class DirectRouter extends BasicRouter { } } - private case class DirectRouterState(val ref: ActorRef, val version: Long) - + private case class DirectRouterState(ref: ActorRef, version: Long) } /** @@ -546,5 +556,4 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter { protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results) - } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index e073189b55..424854ee71 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -46,8 +46,10 @@ object ClusterActorRef { val failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector = failureDetectorType match { case RemoveConnectionOnFirstFailure ⇒ (connections: Map[InetSocketAddress, ActorRef]) ⇒ new RemoveConnectionOnFirstFailureFailureDetector(connections) - case _ ⇒ + case Local ⇒ (connections: Map[InetSocketAddress, ActorRef]) ⇒ new LocalFailureDetector + case _ ⇒ + (connections: Map[InetSocketAddress, ActorRef]) ⇒ new RemoveConnectionOnFirstFailureFailureDetector(connections) } new ClusterActorRef( diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 0dec42f168..bf3227e95a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -56,22 +56,26 @@ object FailureDetector { abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector { import ClusterActorRef._ + case class State(val version: Long = Integer.MIN_VALUE, val connections: Map[InetSocketAddress, ActorRef]) extends VersionedIterable[ActorRef] { + def iterable: Iterable[ActorRef] = connections.values + } + // type C private val state = new AtomicReference[State]() - state.set(new State(Long.MinValue, initialConnections)) + state.set(State(Long.MinValue, initialConnections)) def version: Long = state.get().version def versionedIterable = state.get - def size: Int = state.get.iterable.size + def size: Int = state.get.connections.size def connections: Map[InetSocketAddress, ActorRef] = state.get.connections def stopAll() { - state.get().connections.values foreach (_.stop()) // shut down all remote connections + state.get().iterable foreach (_.stop()) // shut down all remote connections } @tailrec @@ -90,7 +94,7 @@ abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, Ac if (changed) { //there was a state change, so we are now going to update the state. - val newState = new State(oldState.version + 1, newMap) + val newState = State(oldState.version + 1, newMap) //if we are not able to update, the state, we are going to try again. if (!state.compareAndSet(oldState, newState)) failOver(from, to) @@ -98,18 +102,18 @@ abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, Ac } @tailrec - final def remove(deadRef: ActorRef) { - EventHandler.debug(this, "ClusterActorRef remove [%s]".format(deadRef.uuid)) + final def remove(faultyConnection: ActorRef) { + EventHandler.debug(this, "ClusterActorRef remove [%s]".format(faultyConnection.uuid)) val oldState = state.get() var changed = false - //remote the deadRef from the clustered-connections. + //remote the faultyConnection from the clustered-connections. var newConnections = Map.empty[InetSocketAddress, ActorRef] oldState.connections.keys foreach { address ⇒ val actorRef: ActorRef = oldState.connections.get(address).get - if (actorRef ne deadRef) { + if (actorRef ne faultyConnection) { newConnections = newConnections + ((address, actorRef)) } else { changed = true @@ -118,22 +122,12 @@ abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, Ac if (changed) { //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = new State(oldState.version + 1, newConnections) + val newState = State(oldState.version + 1, newConnections) //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(deadRef) + if (!state.compareAndSet(oldState, newState)) remove(faultyConnection) } } - - class State(version: Long = Integer.MIN_VALUE, - val connections: Map[InetSocketAddress, ActorRef]) - extends VersionedIterable[ActorRef](version, connections.values) - - // class State[C](version: Long = Integer.MIN_VALUE, - // val connections: Map[InetSocketAddress, ActorRef], - // val explicitConnections: Iterable[ActorRef], - // val context: C) - // extends VersionedIterable[ActorRef](version, explicitConnections ++ connections.values) } trait RemoteFailureListener {