diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 388ce4bba4..7ba0fd96c9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -20,6 +20,7 @@ class DeployerSpec extends WordSpec with MustMatchers { "service-ping", None, LeastCPU, + ReplicationFactor(3), BannagePeriodFailureDetector(10), RemoteScope("localhost", 2552)))) // ClusterScope( diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index fbe3965ca8..ec7f339625 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -7,6 +7,7 @@ package akka.actor import DeploymentConfig._ import akka.event.EventHandler import akka.AkkaException +import akka.routing._ /** * Interface for all ActorRef providers to implement. @@ -59,7 +60,7 @@ private[akka] class ActorRefProviders( providers match { case Nil ⇒ None case provider :: rest ⇒ - provider.actorOf(props, address) match { //WARNING FIXME RACE CONDITION NEEDS TO BE SOLVED + provider.actorOf(props, address) match { case None ⇒ actorOf(props, address, rest) // recur case ref ⇒ ref } @@ -124,8 +125,8 @@ private[akka] class ActorRefProviders( class LocalActorRefProvider extends ActorRefProvider { import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise + import com.eaio.uuid.UUID - // FIXME who evicts this registry, and when? Should it be used instead of ActorRegistry? private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] def actorOf(props: Props, address: String): Option[ActorRef] = actorOf(props, address, false) @@ -145,13 +146,31 @@ class LocalActorRefProvider extends ActorRefProvider { if (oldFuture eq null) { // we won the race -- create the actor and resolve the future - def newActor() = Some(new LocalActorRef(props, address, systemService)) - val actor = try { Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor - case Some(Deploy(_, _, router, _, LocalScope)) ⇒ newActor() // create a local actor - case None ⇒ newActor() // create a local actor - case _ ⇒ None // non-local actor + case Some(Deploy(_, _, router, nrOfInstances, _, LocalScope)) ⇒ + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(router) match { + case RouterType.Direct ⇒ () ⇒ new DirectRouter + case RouterType.Random ⇒ () ⇒ new RandomRouter + case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom ⇒ sys.error("Router Custom not supported yet") + } + val connections: Iterable[ActorRef] = + if (nrOfInstances.factor > 0) + Vector.fill(nrOfInstances.factor)(new LocalActorRef(props, new UUID().toString, systemService)) + else Nil + + Some(Routing.actorOf(RoutedProps( + routerFactory = routerFactory, + connections = connections))) + + case None ⇒ + Some(new LocalActorRef(props, address, systemService)) // create a local actor + + case _ ⇒ None // non-local actor } } catch { case e: Exception ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 32718c7225..27b6197140 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -46,7 +46,7 @@ object Deployer extends ActorDeployer { def deploy(deployment: Deploy): Unit = instance.deploy(deployment) def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) ⇒ true + case Deploy(_, _, _, _, _, LocalScope) | Deploy(_, _, _, _, _, _: LocalScope) ⇒ true case _ ⇒ false } @@ -122,7 +122,7 @@ object Deployer extends ActorDeployer { val addressPath = "akka.actor.deployment." + address configuration.getSection(addressPath) match { case None ⇒ - Some(Deploy(address, None, Direct, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) + Some(Deploy(address, None, Direct, ReplicationFactor(1), RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) case Some(addressConfig) ⇒ @@ -144,6 +144,29 @@ object Deployer extends ActorDeployer { CustomRouter(_)) } + // -------------------------------- + // akka.actor.deployment.
.replication-factor + // -------------------------------- + val nrOfInstances = { + if (router == Direct) new ReplicationFactor(1) + else { + addressConfig.getAny("replication-factor", "0") match { + case "auto" ⇒ AutoReplicationFactor + case "0" ⇒ ZeroReplicationFactor + case nrOfReplicas: String ⇒ + try { + new ReplicationFactor(nrOfReplicas.toInt) + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Config option [" + addressPath + + ".cluster.replication-factor] needs to be either [\"auto\"] or [0-N] - was [" + + nrOfReplicas + "]") + } + } + } + } + // -------------------------------- // akka.actor.deployment..failure-detector.xxx // -------------------------------- @@ -210,7 +233,7 @@ object Deployer extends ActorDeployer { val hostname = remoteConfig.getString("hostname", "localhost") val port = remoteConfig.getInt("port", 2552) - Some(Deploy(address, recipe, router, failureDetector, RemoteScope(hostname, port))) + Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(hostname, port))) case None ⇒ // check for 'cluster' config section @@ -219,7 +242,7 @@ object Deployer extends ActorDeployer { // -------------------------------- addressConfig.getSection("cluster") match { case None ⇒ - Some(Deploy(address, recipe, router, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) // deploy locally + Some(Deploy(address, recipe, router, nrOfInstances, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) // deploy locally case Some(clusterConfig) ⇒ @@ -251,35 +274,12 @@ object Deployer extends ActorDeployer { } } - // -------------------------------- - // akka.actor.deployment..cluster.replicas - // -------------------------------- - val replicationFactor = { - if (router == Direct) new ReplicationFactor(1) - else { - clusterConfig.getAny("replication-factor", "0") match { - case "auto" ⇒ AutoReplicationFactor - case "0" ⇒ ZeroReplicationFactor - case nrOfReplicas: String ⇒ - try { - new ReplicationFactor(nrOfReplicas.toInt) - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Config option [" + addressPath + - ".cluster.replicas] needs to be either [\"auto\"] or [0-N] - was [" + - nrOfReplicas + "]") - } - } - } - } - // -------------------------------- // akka.actor.deployment..cluster.replication // -------------------------------- clusterConfig.getSection("replication") match { case None ⇒ - Some(Deploy(address, recipe, router, failureDetector, ClusterScope(preferredNodes, replicationFactor, Transient))) + Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, ClusterScope(preferredNodes, Transient))) case Some(replicationConfig) ⇒ val storage = replicationConfig.getString("storage", "transaction-log") match { @@ -298,7 +298,7 @@ object Deployer extends ActorDeployer { ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + unknown + "]") } - Some(Deploy(address, recipe, router, failureDetector, ClusterScope(preferredNodes, replicationFactor, Replication(storage, strategy)))) + Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, ClusterScope(preferredNodes, Replication(storage, strategy)))) } } } @@ -319,7 +319,7 @@ object Deployer extends ActorDeployer { } /** - * TODO: Improved documentation + * Simple local deployer, only for internal use. * * @author Jonas Bonér */ @@ -335,15 +335,7 @@ object LocalDeployer extends ActorDeployer { } private[akka] def deploy(deployment: Deploy) { - deployments.putIfAbsent(deployment.address, deployment) /* match { - case null ⇒ - deployment match { - case Deploy(address, Some(recipe), routing, _) ⇒ Actor.actorOf(recipe.implementationClass, address) //FIXME use routing? - case _ ⇒ - } - case `deployment` ⇒ //Already deployed TODO should it be like this? - case preexists ⇒ Deployer.throwDeploymentBoundException(deployment) - }*/ + deployments.putIfAbsent(deployment.address, deployment) } private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = Option(deployments.get(address)) diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 8f33e300a6..4e1e6853ae 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -23,6 +23,7 @@ object DeploymentConfig { address: String, recipe: Option[ActorRecipe], routing: Routing = Direct, + nrOfInstances: ReplicationFactor = ZeroReplicationFactor, failureDetector: FailureDetector = RemoveConnectionOnFirstFailureLocalFailureDetector, scope: Scope = LocalScope) { Address.validate(address) @@ -76,7 +77,6 @@ object DeploymentConfig { sealed trait Scope case class ClusterScope( preferredNodes: Iterable[Home] = Vector(Node(Config.nodename)), - replicas: ReplicationFactor = ZeroReplicationFactor, replication: ReplicationScheme = Transient) extends Scope case class RemoteScope( @@ -206,7 +206,7 @@ object DeploymentConfig { } def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match { - case Deploy(_, _, _, _, ClusterScope(_, _, replicationScheme)) ⇒ Some(replicationScheme) + case Deploy(_, _, _, _, _, ClusterScope(_, replicationScheme)) ⇒ Some(replicationScheme) case _ ⇒ None } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 809adb5c62..826721f7b6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -31,7 +31,6 @@ class RemoteActorRefProvider extends ActorRefProvider { import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise - // FIXME who evicts this registry, and when? Should it be used instead of ActorRegistry? private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] private val failureDetector = new BannagePeriodFailureDetector(timeToBan = 60 seconds) // FIXME make timeToBan configurable @@ -45,7 +44,7 @@ class RemoteActorRefProvider extends ActorRefProvider { if (oldFuture eq null) { // we won the race -- create the actor and resolve the future val actor = try { Deployer.lookupDeploymentFor(address) match { - case Some(Deploy(_, _, router, _, RemoteScope(host, port))) ⇒ + case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(host, port))) ⇒ // FIXME create RoutedActorRef if 'router' is specified val serverAddress = Remote.address diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 02f699c393..41d2a11b09 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -61,7 +61,7 @@ akka { deployment { - service-ping { # stateless actor with replication factor 3 and round-robin load-balancer + service-ping { # deployment id pattern router = "least-cpu" # routing (load-balance) scheme to use # available: "direct", "round-robin", "random", @@ -70,6 +70,12 @@ akka { # default is "direct"; # if 'replication' is used then the only available router is "direct" + # replication-factor = 3 # number of actor instances in the cluster + # available: positive integer (0-N) or the string "auto" for auto-scaling + # if "auto" is used then 'home' has no meaning + # default is '0', meaning no replicas; + # if the "direct" router is used then this element is ignored (always '1') + failure-detector { # failure detection scheme to use bannage-period { # available: remove-connection-on-first-local-failure {} time-to-ban = 10 # remove-connection-on-first-failure {} @@ -100,11 +106,6 @@ akka { # defined as node name # available: "node: