diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 636bc6b364..57b5ae39f8 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -5,20 +5,21 @@ package akka.actor import DeploymentConfig._ -import akka.experimental import akka.dispatch._ import akka.config._ +import akka.routing._ import Config._ import akka.util.{ ReflectiveAccess, Duration } import ReflectiveAccess._ import akka.cluster.RemoteSupport -import akka.japi.{ Creator, Procedure } -import akka.AkkaException -import akka.serialization.{ Serializer, Serialization } import akka.cluster.ClusterNode +import akka.japi.{ Creator, Procedure } +import akka.serialization.{ Serializer, Serialization } import akka.event.EventHandler -import scala.collection.immutable.Stack +import akka.experimental +import akka.AkkaException +import scala.collection.immutable.Stack import scala.reflect.BeanProperty import com.eaio.uuid.UUID @@ -496,7 +497,7 @@ object Actor { cluster.store(address, factory, replicas.factor, replicationScheme, false, serializer) // remote node (not home node), check out as ClusterActorRef - cluster.ref(address, DeploymentConfig.routerTypeFor(router)) + cluster.ref(address, DeploymentConfig.routerTypeFor(router), FailureDetectorType.RemoveConnectionOnFirstFailure) //DeploymentConfig.failureDetectorTypeFor(failureDetector)) } replication match { diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 79cf40f50a..e6345ea1dc 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -23,6 +23,7 @@ object DeploymentConfig { address: String, recipe: Option[ActorRecipe], routing: Routing = Direct, + // failureDetector: FailureDetector = RemoveConnectionOnFirstFailure, scope: Scope = Local) { Address.validate(address) } @@ -54,6 +55,17 @@ object DeploymentConfig { case object LeastRAM extends Routing case object LeastMessages extends Routing + // -------------------------------- + // --- FailureDetector + // -------------------------------- + sealed trait FailureDetector + + // For Java API + case class RemoveConnectionOnFirstFailure() extends FailureDetector + + // For Scala API + case object RemoveConnectionOnFirstFailure extends FailureDetector + // -------------------------------- // --- Scope // -------------------------------- @@ -81,7 +93,10 @@ object DeploymentConfig { // --- Replicas // -------------------------------- - case class ReplicationFactor(val factor: Int) { + object ReplicationFactor { + def apply(factor: Int) = new ReplicationFactor(factor) + } + class ReplicationFactor(val factor: Int) { if (factor < 0) throw new IllegalArgumentException("replication-factor can not be negative") } @@ -148,6 +163,11 @@ object DeploymentConfig { def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == Config.nodename) + // def failureDetectorTypeFor(failureDetector: FailureDetector): FailureDetectorType = FailureDetectorType match { + // case RemoveConnectionOnFirstFailure ⇒ FailureDetectorType.RemoveConnectionOnFirstFailure + // case unknown ⇒ throw new UnsupportedOperationException("Unknown FailureDetector [" + unknown + "]") + // } + def routerTypeFor(routing: Routing): RouterType = routing match { case Direct ⇒ RouterType.Direct case Direct() ⇒ RouterType.Direct diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index b5acf3f981..38ab4ad6df 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -4,13 +4,15 @@ package akka.cluster -import akka.cluster.RemoteSupport -import akka.serialization.Serializer import akka.actor._ import DeploymentConfig._ import akka.dispatch.Future import akka.config.Config -import akka.routing.RouterType +import akka.routing._ +import akka.serialization.Serializer +import akka.cluster.metrics._ +import akka.util.Duration +import akka.util.duration._ import akka.AkkaException import com.eaio.uuid.UUID @@ -18,11 +20,6 @@ import com.eaio.uuid.UUID import java.net.InetSocketAddress import java.util.concurrent.{ ConcurrentSkipListSet } -import akka.cluster.metrics._ - -import akka.util.Duration -import akka.util.duration._ - class ClusterException(message: String) extends AkkaException(message) object ChangeListener { @@ -427,7 +424,7 @@ trait ClusterNode { /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: RouterType): ActorRef + def ref(actorAddress: String, router: RouterType, failureDetector: FailureDetectorType): ActorRef /** * Returns the addresses of all actors checked out on this node. diff --git a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala new file mode 100644 index 0000000000..8262fcc5d9 --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala @@ -0,0 +1,205 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.routing + +import akka.actor._ +import akka.util.ReflectiveAccess + +import java.net.InetSocketAddress + +import scala.collection.JavaConversions.iterableAsScalaIterable + +sealed trait FailureDetectorType + +/** + * Used for declarative configuration of failure detection in Routing. + * + * @author Jonas Bonér + */ +object FailureDetectorType { + + object Local extends FailureDetectorType + + object RemoveConnectionOnFirstFailure extends FailureDetectorType +} + +sealed trait RouterType + +/** + * Used for declarative configuration of Routing. + * + * @author Jonas Bonér + */ +object RouterType { + + object Direct extends RouterType + + /** + * A RouterType that randomly selects a connection to send a message to. + */ + object Random extends RouterType + + /** + * A RouterType that selects the connection by using round robin. + */ + object RoundRobin extends RouterType + + /** + * A RouterType that selects the connection based on the least amount of cpu usage + */ + object LeastCPU extends RouterType + + /** + * A RouterType that select the connection based on the least amount of ram used. + * + * FIXME: this is extremely vague currently since there are so many ways to define least amount of ram. + */ + object LeastRAM extends RouterType + + /** + * A RouterType that select the connection where the actor has the least amount of messages in its mailbox. + */ + object LeastMessages extends RouterType + + /** + * A user-defined custom RouterType. + */ + object Custom extends RouterType + +} + +object RoutedProps { + + final val defaultTimeout = Actor.TIMEOUT + final val defaultRouterFactory = () ⇒ new RoundRobinRouter + final val defaultDeployId = "" + final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled + final val defaultFailureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) ⇒ new LocalFailureDetector(connections.values) + + /** + * The default RoutedProps instance, uses the settings from the RoutedProps object starting with default* + */ + final val default = new RoutedProps + + def apply(): RoutedProps = default +} + +/** + * Contains the configuration to create local and clustered routed actor references. + * + * Routed ActorRef configuration object, this is thread safe and fully sharable. + * + * Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing + * (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns + * a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple + * threads safe. + * + * This configuration object makes it possible to either. + */ +case class RoutedProps( + routerFactory: () ⇒ Router, + failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector, + deployId: String, + connections: Iterable[ActorRef], + timeout: Timeout, + localOnly: Boolean) { + + def this() = this( + routerFactory = RoutedProps.defaultRouterFactory, + failureDetectorFactory = RoutedProps.defaultFailureDetectorFactory, + deployId = RoutedProps.defaultDeployId, + connections = List(), + timeout = RoutedProps.defaultTimeout, + localOnly = RoutedProps.defaultLocalOnly) + + /** + * Returns a new RoutedProps with the specified deployId set + * + * Java and Scala API + */ + def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id) + + /** + * Returns a new RoutedProps configured with a random router. + * + * Java and Scala API. + */ + def withRandomRouter: RoutedProps = copy(routerFactory = () ⇒ new RandomRouter) + + /** + * Returns a new RoutedProps configured with a round robin router. + * + * Java and Scala API. + */ + def withRoundRobinRouter: RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter) + + /** + * Returns a new RoutedProps configured with a direct router. + * + * Java and Scala API. + */ + def withDirectRouter: RoutedProps = copy(routerFactory = () ⇒ new DirectRouter) + + /** + * Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created. + * In some cases you just want to have local actor references, even though the Cluster Module is up and running. + * + * Java and Scala API. + */ + def withLocalOnly(l: Boolean = true) = copy(localOnly = l) + + /** + * Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new + * Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new + * instances. + * + * Scala API. + */ + def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f) + + /** + * Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new + * Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new + * instances. + * + * Java API. + */ + def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter()) + + /** + * + */ + def withTimeout(t: Timeout): RoutedProps = copy(timeout = t) + + /** + * Sets the connections to use. + * + * Scala API. + */ + def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c) + + /** + * Sets the connections to use. + * + * Java API. + */ + def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c)) + + /** + * Returns a new RoutedProps configured with a FailureDetector factory. + * + * Scala API. + */ + def withFailureDetector(failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector): RoutedProps = + copy(failureDetectorFactory = failureDetectorFactory) + + /** + * Returns a new RoutedProps configured with a FailureDetector factory. + * + * Java API. + */ + def withFailureDetector(failureDetectorFactory: akka.japi.Function[Map[InetSocketAddress, ActorRef], FailureDetector]): RoutedProps = + copy(failureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) ⇒ failureDetectorFactory.apply(connections)) +} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index bd2b86e3e4..6efbc2b803 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,181 +4,21 @@ package akka.routing -import annotation.tailrec - import akka.AkkaException import akka.actor._ import akka.event.EventHandler import akka.actor.UntypedChannel._ - -import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } import akka.dispatch.{ Future, Futures } import akka.util.ReflectiveAccess -import collection.JavaConversions.iterableAsScalaIterable -sealed trait RouterType +import java.net.InetSocketAddress +import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } -/** - * Used for declarative configuration of Routing. - * - * @author Jonas Bonér - */ -object RouterType { - - object Direct extends RouterType - - /** - * A RouterType that randomly selects a connection to send a message to. - */ - object Random extends RouterType - - /** - * A RouterType that selects the connection by using round robin. - */ - object RoundRobin extends RouterType - - /** - * A RouterType that selects the connection based on the least amount of cpu usage - */ - object LeastCPU extends RouterType - - /** - * A RouterType that select the connection based on the least amount of ram used. - * - * FIXME: this is extremely vague currently since there are so many ways to define least amount of ram. - */ - object LeastRAM extends RouterType - - /** - * A RouterType that select the connection where the actor has the least amount of messages in its mailbox. - */ - object LeastMessages extends RouterType - - /** - * A user-defined custom RouterType. - */ - object Custom extends RouterType - -} - -object RoutedProps { - - final val defaultTimeout = Actor.TIMEOUT - final val defaultRouterFactory = () ⇒ new RoundRobinRouter - final val defaultDeployId = "" - final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled - - /** - * The default RoutedProps instance, uses the settings from the RoutedProps object starting with default* - */ - final val default = new RoutedProps - - def apply(): RoutedProps = default -} - -/** - * Contains the configuration to create local and clustered routed actor references. - * - * Routed ActorRef configuration object, this is thread safe and fully sharable. - * - * Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing - * (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns - * a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple - * threads safe. - * - * This configuration object makes it possible to either - */ -case class RoutedProps( - routerFactory: () ⇒ Router, - deployId: String, - connections: Iterable[ActorRef], - timeout: Timeout, - localOnly: Boolean) { - - def this() = this( - routerFactory = RoutedProps.defaultRouterFactory, - deployId = RoutedProps.defaultDeployId, - connections = List(), - timeout = RoutedProps.defaultTimeout, - localOnly = RoutedProps.defaultLocalOnly) - - /** - * Returns a new RoutedProps with the specified deployId set - * - * Java and Scala API - */ - def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id) - - /** - * Returns a new RoutedProps configured with a random router. - * - * Java and Scala API. - */ - def withRandomRouter(): RoutedProps = copy(routerFactory = () ⇒ new RandomRouter) - - /** - * Returns a new RoutedProps configured with a round robin router. - * - * Java and Scala API. - */ - def withRoundRobinRouter(): RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter) - - /** - * Returns a new RoutedProps configured with a direct router. - * - * Java and Scala API. - */ - def withDirectRouter(): RoutedProps = copy(routerFactory = () ⇒ new DirectRouter) - - /** - * Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created. - * In some cases you just want to have local actor references, even though the Cluster Module is up and running. - * - * Java and Scala API. - */ - def withLocalOnly(l: Boolean = true) = copy(localOnly = l) - - /** - * Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new - * Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new - * instances. - * - * Scala API. - */ - def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f) - - /** - * Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new - * Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new - * instances. - * - * Java API. - */ - def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter()) - - /** - * - */ - def withTimeout(t: Timeout): RoutedProps = copy(timeout = t) - - /** - * Sets the connections to use. - * - * Scala API. - */ - def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c) - - /** - * Sets the connections to use. - * - * Java API. - */ - def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c)) -} +import scala.annotation.tailrec /** * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the - * {@link RouterConnections} and each Router should be linked to only one {@link RouterConnections}. + * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}. * * @author Jonas Bonér */ @@ -194,7 +34,7 @@ trait Router { * JMM Guarantees: * This method guarantees that all changes made in this method, are visible before one of the routing methods is called. */ - def init(connections: RouterConnections) + def init(connections: FailureDetector) /** * Routes the message to one of the connections. @@ -212,14 +52,23 @@ trait Router { def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] } +/** + * An Iterable that also contains a version. + */ +class VersionedIterable[A](val version: Long, val iterable: Iterable[A]) { + def apply() = iterable +} + /** * An {@link AkkaException} thrown when something goes wrong while routing a message */ class RoutingException(message: String) extends AkkaException(message) /** - * The RouterConnection acts like a middleman between the Router and the actor reference that does the routing. - * Through the RouterConnection: + * The FailureDetector acts like a middleman between the Router and the actor reference that does the routing + * and can dectect and act upon failur. + * + * Through the FailureDetector: *
    *
  1. * the actor ref can signal that something has changed in the known set of connections. The Router can see @@ -229,10 +78,8 @@ class RoutingException(message: String) extends AkkaException(message) * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying. *
  2. *
- * - * It is very likely that the implementation of the RouterConnection will be part of the ActorRef itself. */ -trait RouterConnections { +trait FailureDetector { /** * A version that is useful to see if there is any change in the connections. If there is a change, a router is @@ -246,6 +93,11 @@ trait RouterConnections { */ def size: Int + /** + * Stops all managed actors + */ + def stopAll() + /** * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is * the time element, also the version is included to be able to read the data (the connections) and the version @@ -271,12 +123,55 @@ trait RouterConnections { * @param ref the dead */ def remove(deadRef: ActorRef) + + /** + * Fails over connections from one address to another. + */ + def failOver(from: InetSocketAddress, to: InetSocketAddress) } /** - * An Iterable that also contains a version. + * Default "local" failure detector. This failure detector removes an actor from the + * router if an exception occured in the router's thread (e.g. when trying to add + * the message to the receiver's mailbox). */ -case class VersionedIterable[A](version: Long, val iterable: Iterable[A]) +class LocalFailureDetector extends FailureDetector { + + private val state = new AtomicReference[VersionedIterable[ActorRef]] + + def this(connectionIterable: Iterable[ActorRef]) = { + this() + state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable)) + } + + def version: Long = state.get.version + + def size: Int = state.get.iterable.size + + def versionedIterable = state.get + + def stopAll() { + state.get.iterable foreach (_.stop()) + } + + def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here + + @tailrec + final def remove(ref: ActorRef) = { + val oldState = state.get + + //remote the ref from the connections. + var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref) + + if (newList.size != oldState.iterable.size) { + //one or more occurrences of the actorRef were removed, so we need to update the state. + + val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList) + //if we are not able to update the state, we just try again. + if (!state.compareAndSet(oldState, newState)) remove(ref) + } + } +} /** * A Helper class to create actor references that use routing. @@ -320,7 +215,7 @@ object Routing { * @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation * how many connections it can handle. */ - @deprecated("will be removed") + @deprecated("Use 'Routing.actorOf(props: RoutedProps)' instead.", "2.0") def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = { val router = routerType match { case RouterType.Direct if connections.size > 1 ⇒ @@ -339,7 +234,12 @@ object Routing { throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required") new RoutedActorRef( - new RoutedProps(() ⇒ router, actorAddress, connections, RoutedProps.defaultTimeout, true)).start() + new RoutedProps( + () ⇒ router, + RoutedProps.defaultFailureDetectorFactory, + actorAddress, + connections, + RoutedProps.defaultTimeout, true)).start() } } @@ -377,7 +277,7 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) { - router.init(new RoutedActorRefConnections(routedProps.connections)) + router.init(new LocalFailureDetector(routedProps.connections)) def start(): this.type = synchronized[this.type] { if (_status == ActorRefInternals.UNSTARTED) @@ -393,38 +293,6 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps) } } } - - private class RoutedActorRefConnections extends RouterConnections { - - private val state = new AtomicReference[VersionedIterable[ActorRef]] - - def this(connectionIterable: Iterable[ActorRef]) = { - this() - state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable)) - } - - def version: Long = state.get.version - - def size: Int = state.get.iterable.size - - def versionedIterable = state.get - - @tailrec - final def remove(ref: ActorRef) = { - val oldState = state.get - - //remote the ref from the connections. - var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref) - - if (newList.size != oldState.iterable.size) { - //one or more occurrences of the actorRef were removed, so we need to update the state. - - val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList) - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(ref) - } - } - } } /** @@ -437,9 +305,9 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps) trait BasicRouter extends Router { @volatile - protected var connections: RouterConnections = _ + protected var connections: FailureDetector = _ - def init(connections: RouterConnections) = { + def init(connections: FailureDetector) = { this.connections = connections } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 64b965a18c..149982d4d1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -15,9 +15,13 @@ import org.I0Itec.zkclient.exception._ import java.util.{ List ⇒ JList } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } +import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap } +import javax.management.StandardMBean import java.net.InetSocketAddress + import scala.collection.mutable.ConcurrentMap import scala.collection.JavaConversions._ +import scala.annotation.tailrec import akka.util._ import duration._ @@ -30,17 +34,16 @@ import DeploymentConfig._ import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } -import akka.cluster._ -import akka.routing.RouterType - import akka.config.{ Config, Supervision } import Supervision._ import Config._ -import akka.serialization.{ Serialization, Serializer, ActorSerialization } +import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } import ActorSerialization._ -import akka.serialization.Compression.LZF +import Compression.LZF +import akka.routing._ +import akka.cluster._ import akka.cluster.metrics._ import akka.cluster.zookeeper._ import ChangeListener._ @@ -51,11 +54,6 @@ import com.eaio.uuid.UUID import com.google.protobuf.ByteString -import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap } - -import annotation.tailrec -import javax.management.{ StandardMBean } - // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down /** @@ -906,7 +904,8 @@ class DefaultClusterNode private[akka] ( /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: RouterType): ActorRef = ClusterActorRef.newRef(router, actorAddress, Actor.TIMEOUT) + def ref(actorAddress: String, router: RouterType, failureDetector: FailureDetectorType): ActorRef = + ClusterActorRef.newRef(actorAddress, router, failureDetector, Actor.TIMEOUT) /** * Returns the UUIDs of all actors checked out on this node. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index 05f66c35ec..e073189b55 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -9,8 +9,8 @@ import akka.util._ import akka.event.EventHandler import ReflectiveAccess._ import akka.routing._ -import RouterType._ import akka.cluster._ +import FailureDetector._ import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -24,23 +24,38 @@ import annotation.tailrec * @author Jonas Bonér */ object ClusterActorRef { + import FailureDetectorType._ + import RouterType._ - def newRef(routerType: RouterType, actorAddress: String, timeout: Long): ClusterActorRef = { + def newRef( + actorAddress: String, + routerType: RouterType, + failureDetectorType: FailureDetectorType, + timeout: Long): ClusterActorRef = { val routerFactory: () ⇒ Router = routerType match { case Direct ⇒ () ⇒ new DirectRouter - case Random ⇒ () ⇒ new RandomRouter() - case RoundRobin ⇒ () ⇒ new RoundRobinRouter() + case Random ⇒ () ⇒ new RandomRouter + case RoundRobin ⇒ () ⇒ new RoundRobinRouter case LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case Custom ⇒ sys.error("Router Custom not supported yet") + } + + val failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector = failureDetectorType match { + case RemoveConnectionOnFirstFailure ⇒ + (connections: Map[InetSocketAddress, ActorRef]) ⇒ new RemoveConnectionOnFirstFailureFailureDetector(connections) + case _ ⇒ + (connections: Map[InetSocketAddress, ActorRef]) ⇒ new LocalFailureDetector } new ClusterActorRef( RoutedProps() .withDeployId(actorAddress) .withTimeout(timeout) - .withRouter(routerFactory)).start() + .withRouter(routerFactory) + .withFailureDetector(failureDetectorFactory)).start() } /** @@ -67,6 +82,7 @@ private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedAc ClusterModule.ensureEnabled() val addresses = Cluster.node.inetSocketAddressesForActor(address) + EventHandler.debug(this, "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]" .format(address, router, Cluster.node.remoteServerAddress, addresses.map(_._2).mkString("\n\t"))) @@ -75,15 +91,19 @@ private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedAc case (_, address) ⇒ Cluster.node.clusterActorRefs.put(address, this) } - val connections = new ClusterActorRefConnections((Map[InetSocketAddress, ActorRef]() /: addresses) { - case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress)) - }, props.connections) + val connections: FailureDetector = { + val remoteConnections = (Map[InetSocketAddress, ActorRef]() /: addresses) { + case (map, (uuid, inetSocketAddress)) ⇒ + map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress)) + } + props.failureDetectorFactory(remoteConnections) + } router.init(connections) def nrOfConnections: Int = connections.size - private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = { + private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) { connections.failOver(from, to) } @@ -108,91 +128,3 @@ private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedAc } } } - -class ClusterActorRefConnections extends RouterConnections { - import ClusterActorRef._ - - private val state = new AtomicReference[State]() - - def this(clusteredConnections: Map[InetSocketAddress, ActorRef], explicitConnections: Iterable[ActorRef]) = { - this() - state.set(new State(Long.MinValue, clusteredConnections, explicitConnections)) - } - - def version: Long = state.get().version - - def versionedIterable = state.get - - def size: Int = state.get().iterable.size - - def stopAll() { - state.get().clusteredConnections.values foreach (_.stop()) // shut down all remote connections - } - - @tailrec - final def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = { - EventHandler.debug(this, "ClusterActorRef failover from [%s] to [%s]".format(from, to)) - - val oldState = state.get - var changed = false - val newMap = oldState.clusteredConnections map { - case (`from`, actorRef) ⇒ - changed = true - //actorRef.stop() - (to, createRemoteActorRef(actorRef.address, to)) - case other ⇒ other - } - - if (changed) { - //there was a state change, so we are now going to update the state. - val newState = new State(oldState.version + 1, newMap, oldState.explicitConnections) - - //if we are not able to update, the state, we are going to try again. - if (!state.compareAndSet(oldState, newState)) failOver(from, to) - } - } - - @tailrec - final def remove(deadRef: ActorRef) = { - EventHandler.debug(this, "ClusterActorRef remove [%s]".format(deadRef.uuid)) - - val oldState = state.get() - - var changed = false - - //remote the deadRef from the clustered-connections. - var newConnections = Map[InetSocketAddress, ActorRef]() - oldState.clusteredConnections.keys.foreach( - address ⇒ { - val actorRef: ActorRef = oldState.clusteredConnections.get(address).get - if (actorRef ne deadRef) { - newConnections = newConnections + ((address, actorRef)) - } else { - changed = true - } - }) - - //remove the deadRef also from the explicit connections. - var newExplicitConnections = oldState.explicitConnections.filter( - actorRef ⇒ - if (actorRef == deadRef) { - changed = true - false - } else { - true - }) - - if (changed) { - //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = new State(oldState.version + 1, newConnections, newExplicitConnections) - - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(deadRef) - } - } - - class State(version: Long = Integer.MIN_VALUE, - val clusteredConnections: Map[InetSocketAddress, ActorRef], - val explicitConnections: Iterable[ActorRef]) - extends VersionedIterable[ActorRef](version, explicitConnections ++ clusteredConnections.values) -} diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 0e6a7916b4..0dec42f168 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -4,32 +4,36 @@ package akka.cluster -import akka.actor.{ Actor, Props } +import akka.actor.{ Actor, ActorRef, Props } import Actor._ import akka.cluster._ +import akka.routing._ +import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } import akka.util.ListenerManagement import scala.collection.mutable.{ HashMap, Set } +import scala.annotation.tailrec import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference object FailureDetector { private sealed trait FailureDetectorEvent - private case class Register(strategy: FailOverStrategy, address: InetSocketAddress) extends FailureDetectorEvent - private case class Unregister(strategy: FailOverStrategy, address: InetSocketAddress) extends FailureDetectorEvent + private case class Register(strategy: RemoteFailureListener, address: InetSocketAddress) extends FailureDetectorEvent + private case class Unregister(strategy: RemoteFailureListener, address: InetSocketAddress) extends FailureDetectorEvent private[akka] val registry = actorOf(Props(new Registry).copy(dispatcher = new PinnedDispatcher(), localOnly = true)) - def register(strategy: FailOverStrategy, address: InetSocketAddress) = registry ! Register(strategy, address) + def register(strategy: RemoteFailureListener, address: InetSocketAddress) = registry ! Register(strategy, address) - def unregister(strategy: FailOverStrategy, address: InetSocketAddress) = registry ! Unregister(strategy, address) + def unregister(strategy: RemoteFailureListener, address: InetSocketAddress) = registry ! Unregister(strategy, address) private class Registry extends Actor { - val strategies = new HashMap[InetSocketAddress, Set[FailOverStrategy]]() { - override def default(k: InetSocketAddress) = Set.empty[FailOverStrategy] + val strategies = new HashMap[InetSocketAddress, Set[RemoteFailureListener]]() { + override def default(k: InetSocketAddress) = Set.empty[RemoteFailureListener] } def receive = { @@ -47,72 +51,161 @@ object FailureDetector { case _ ⇒ //ignore other } } - - trait FailOverStrategy { - - def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientWriteFailed(request, cause, client, address) ⇒ - remoteClientWriteFailed(request, cause, client, address) - println("--------->>> RemoteClientWriteFailed") - case RemoteClientError(cause, client, address) ⇒ - println("--------->>> RemoteClientError") - remoteClientError(cause, client, address) - case RemoteClientDisconnected(client, address) ⇒ - remoteClientDisconnected(client, address) - println("--------->>> RemoteClientDisconnected") - case RemoteClientShutdown(client, address) ⇒ - remoteClientShutdown(client, address) - println("--------->>> RemoteClientShutdown") - case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒ - remoteServerWriteFailed(request, cause, server, clientAddress) - case RemoteServerError(cause, server) ⇒ - remoteServerError(cause, server) - case RemoteServerShutdown(server) ⇒ - remoteServerShutdown(server) - } - - def remoteClientWriteFailed(request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {} - - def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {} - - def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {} - - def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {} - - def remoteServerWriteFailed(request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {} - - def remoteServerError(cause: Throwable, server: RemoteServerModule) {} - - def remoteServerShutdown(server: RemoteServerModule) {} - } - - trait RemoveConnectionOnFirstFailureFailOverStrategy extends FailOverStrategy { - - override def remoteClientWriteFailed(request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - override def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - override def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - override def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) { - removeConnection(address) - } - - private def removeConnection(address: InetSocketAddress) = {} //connections.get(address) foreach (remove(connection)) - } - - trait LinearBackoffFailOverStrategy extends FailOverStrategy { - } - - trait ExponentialBackoffFailOverStrategy extends FailOverStrategy { - } - - trait CircuitBreakerFailOverStrategy extends FailOverStrategy { - } } + +abstract class FailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector { + import ClusterActorRef._ + + // type C + + private val state = new AtomicReference[State]() + + state.set(new State(Long.MinValue, initialConnections)) + + def version: Long = state.get().version + + def versionedIterable = state.get + + def size: Int = state.get.iterable.size + + def connections: Map[InetSocketAddress, ActorRef] = state.get.connections + + def stopAll() { + state.get().connections.values foreach (_.stop()) // shut down all remote connections + } + + @tailrec + final def failOver(from: InetSocketAddress, to: InetSocketAddress) { + EventHandler.debug(this, "ClusterActorRef failover from [%s] to [%s]".format(from, to)) + + val oldState = state.get + var changed = false + val newMap = oldState.connections map { + case (`from`, actorRef) ⇒ + changed = true + //actorRef.stop() + (to, createRemoteActorRef(actorRef.address, to)) + case other ⇒ other + } + + if (changed) { + //there was a state change, so we are now going to update the state. + val newState = new State(oldState.version + 1, newMap) + + //if we are not able to update, the state, we are going to try again. + if (!state.compareAndSet(oldState, newState)) failOver(from, to) + } + } + + @tailrec + final def remove(deadRef: ActorRef) { + EventHandler.debug(this, "ClusterActorRef remove [%s]".format(deadRef.uuid)) + + val oldState = state.get() + + var changed = false + + //remote the deadRef from the clustered-connections. + var newConnections = Map.empty[InetSocketAddress, ActorRef] + oldState.connections.keys foreach { address ⇒ + val actorRef: ActorRef = oldState.connections.get(address).get + if (actorRef ne deadRef) { + newConnections = newConnections + ((address, actorRef)) + } else { + changed = true + } + } + + if (changed) { + //one or more occurrances of the actorRef were removed, so we need to update the state. + val newState = new State(oldState.version + 1, newConnections) + + //if we are not able to update the state, we just try again. + if (!state.compareAndSet(oldState, newState)) remove(deadRef) + } + } + + class State(version: Long = Integer.MIN_VALUE, + val connections: Map[InetSocketAddress, ActorRef]) + extends VersionedIterable[ActorRef](version, connections.values) + + // class State[C](version: Long = Integer.MIN_VALUE, + // val connections: Map[InetSocketAddress, ActorRef], + // val explicitConnections: Iterable[ActorRef], + // val context: C) + // extends VersionedIterable[ActorRef](version, explicitConnections ++ connections.values) +} + +trait RemoteFailureListener { + + def notify(event: RemoteLifeCycleEvent) = event match { + case RemoteClientWriteFailed(request, cause, client, address) ⇒ + remoteClientWriteFailed(request, cause, client, address) + println("--------->>> RemoteClientWriteFailed") + case RemoteClientError(cause, client, address) ⇒ + println("--------->>> RemoteClientError") + remoteClientError(cause, client, address) + case RemoteClientDisconnected(client, address) ⇒ + remoteClientDisconnected(client, address) + println("--------->>> RemoteClientDisconnected") + case RemoteClientShutdown(client, address) ⇒ + remoteClientShutdown(client, address) + println("--------->>> RemoteClientShutdown") + case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒ + remoteServerWriteFailed(request, cause, server, clientAddress) + case RemoteServerError(cause, server) ⇒ + remoteServerError(cause, server) + case RemoteServerShutdown(server) ⇒ + remoteServerShutdown(server) + } + + def remoteClientWriteFailed( + request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {} + + def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {} + + def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {} + + def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {} + + def remoteServerWriteFailed( + request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {} + + def remoteServerError(cause: Throwable, server: RemoteServerModule) {} + + def remoteServerShutdown(server: RemoteServerModule) {} +} + +class RemoveConnectionOnFirstFailureFailureDetector(initialConnections: Map[InetSocketAddress, ActorRef]) + extends FailureDetectorBase(initialConnections) + with RemoteFailureListener { + + override def remoteClientWriteFailed( + request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) { + removeConnection(address) + } + + override def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) { + removeConnection(address) + } + + override def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) { + removeConnection(address) + } + + override def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) { + removeConnection(address) + } + + private def removeConnection(address: InetSocketAddress) = + connections.get(address) foreach { connection ⇒ remove(connection) } +} + +trait LinearBackoffRemoteFailureListener extends RemoteFailureListener { +} + +trait ExponentialBackoffRemoteFailureListener extends RemoteFailureListener { +} + +trait CircuitBreakerRemoteFailureListener extends RemoteFailureListener { +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 256d5e48ef..f785723bec 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -17,7 +17,6 @@ import akka.actor._ import DeploymentConfig.ReplicationScheme import akka.event.EventHandler import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation } -import akka.cluster.MessageSerializer import akka.cluster.zookeeper._ import akka.serialization.ActorSerialization._ import akka.serialization.Compression.LZF diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 2603d4e5a3..9b05194ead 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -60,7 +60,7 @@ object Pi extends App { // wrap them with a load-balancing router val router = Routing.actorOf( RoutedProps.default - .withRoundRobinRouter() + .withRoundRobinRouter .withConnections(workers) .withDeployId("pi")) diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index d9e133df98..6057ef57d1 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -56,7 +56,7 @@ object Pi extends App { val router = Routing.actorOf( RoutedProps.apply() .withConnections(workers) - .withRoundRobinRouter() + .withRoundRobinRouter .withDeployId("pi")) // phase 1, can accept a Calculate message