diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index edada342e7..a15068bccd 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -518,7 +518,8 @@ trait RoundRobinLike { this: RouterConfig ⇒ def getNext(): ActorRef = { val _routees = routeeProvider.routees - _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int]) + if (_routees.isEmpty) routeeProvider.context.system.deadLetters + else _routees((next.getAndIncrement % _routees.size).asInstanceOf[Int]) } { @@ -634,7 +635,8 @@ trait RandomLike { this: RouterConfig ⇒ def getNext(): ActorRef = { val _routees = routeeProvider.routees - _routees(ThreadLocalRandom.current.nextInt(_routees.size)) + if (_routees.isEmpty) routeeProvider.context.system.deadLetters + else _routees(ThreadLocalRandom.current.nextInt(_routees.size)) } { diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 30e425dafd..556c39898d 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -106,4 +106,6 @@ akka { reset-timeout = 30 s } } + + actor.deployment.default.cluster = off } diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 202eab4dd7..7efe1f0f1e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -5,7 +5,6 @@ package akka.cluster import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } -import akka.remote.RemoteActorRefProvider import akka.event.Logging import scala.collection.immutable.Map import scala.annotation.tailrec diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c76b637164..0433a46ba3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -73,15 +73,13 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec import ClusterEvent._ - if (!system.provider.isInstanceOf[RemoteActorRefProvider]) - throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration") - - private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider] + if (!system.provider.isInstanceOf[ClusterActorRefProvider]) + throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'ClusterActorRefProvider' enabled in the configuration") val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - val selfAddress = remote.transport.address + val selfAddress = system.provider.asInstanceOf[ClusterActorRefProvider].transport.address private val _isRunning = new AtomicBoolean(true) private val log = Logging(system, "Cluster") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala new file mode 100644 index 0000000000..bea6d6f57f --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import akka.actor.ActorSystem +import akka.actor.DynamicAccess +import akka.actor.Scheduler +import akka.event.EventStream +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteDeployer +import akka.actor.Deploy +import com.typesafe.config.Config +import akka.ConfigurationException +import akka.actor.NoScopeGiven +import akka.routing.RemoteRouterConfig +import akka.cluster.routing.ClusterRouterConfig +import akka.actor.Scope + +class ClusterActorRefProvider( + _systemName: String, + _settings: ActorSystem.Settings, + _eventStream: EventStream, + _scheduler: Scheduler, + _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( + _systemName, _settings, _eventStream, _scheduler, _dynamicAccess) { + + override val deployer: RemoteDeployer = new ClusterDeployer(settings, dynamicAccess) + +} + +private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) { + override def parseConfig(path: String, config: Config): Option[Deploy] = { + super.parseConfig(path, config) match { + case d @ Some(deploy) ⇒ + if (deploy.config.getBoolean("cluster")) { + if (deploy.scope != NoScopeGiven) + throw new ConfigurationException("Cluster deployment can't be combined with scope [%s]".format(deploy.scope)) + if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig]) + throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig)) + Some(deploy.copy(routerConfig = ClusterRouterConfig(deploy.routerConfig))) + } else d + case None ⇒ None + } + } +} + +@SerialVersionUID(1L) +abstract class ClusterScope extends Scope + +/** + * Cluster aware scope of a [[akka.actor.Deploy]] + */ +case object ClusterScope extends ClusterScope { + /** + * Java API: get the singleton instance + */ + def getInstance = this + + def withFallback(other: Scope): Scope = this +} diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala new file mode 100644 index 0000000000..5d58d0ffac --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster.routing + +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory + +import akka.ConfigurationException +import akka.actor.Actor +import akka.actor.ActorContext +import akka.actor.ActorRef +import akka.actor.ActorSystemImpl +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.InternalActorRef +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.remote.RemoteScope +import akka.routing.Resizer +import akka.routing.Route +import akka.routing.RouteeProvider +import akka.routing.Router +import akka.routing.RouterConfig + +/** + * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. + * Delegates other duties to the local [[akka.routing.RouterConfig]], + * which makes it possible to mix this with the built-in routers such as + * [[akka.routing.RoundRobinRouter]] or custom routers. + */ +case class ClusterRouterConfig(local: RouterConfig) extends RouterConfig { + + override def createRouteeProvider(context: ActorContext) = new ClusterRouteeProvider(context, resizer) + + override def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { + local.createRoute(routeeProps, routeeProvider) + } + + override def createActor(): Router = local.createActor() + + override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy + + override def routerDispatcher: String = local.routerDispatcher + + override def resizer: Option[Resizer] = local.resizer + + override def withFallback(other: RouterConfig): RouterConfig = other match { + case ClusterRouterConfig(local) ⇒ copy(local = this.local.withFallback(local)) + case _ ⇒ copy(local = this.local.withFallback(other)) + } +} + +/** + * Factory and registry for routees of the router. + * Deploys new routees on the cluster nodes. + */ +class ClusterRouteeProvider(_context: ActorContext, _resizer: Option[Resizer]) + extends RouteeProvider(_context, _resizer) { + + // need this counter as instance variable since Resizer may call createRoutees several times + private val childNameCounter = new AtomicInteger + + override def createRoutees(props: Props, nrOfInstances: Int, _routees: Iterable[String]): IndexedSeq[ActorRef] = { + val nodes = upNodes + if (_routees.nonEmpty) { + throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]" + format context.self.path.toString) + } else if (nodes.isEmpty) { + IndexedSeq.empty + } else { + val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 + // FIXME We could count number of routees per node and select nodes with least routees first + val nodesIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator + IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { + val name = "c" + childNameCounter.incrementAndGet + val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodesIter.next)) + impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, + systemService = false, Some(deploy), lookupDeploy = false, async = false) + }) + } + } + + // FIXME experimental hack to let the cluster initialize + // What should we do before we have full cluster information (startup phase)? + Cluster(context.system).readView + Thread.sleep(2000) + + import Member.addressOrdering + @volatile + private var upNodes: SortedSet[Address] = Cluster(context.system).readView.members.collect { + case m if m.status == MemberStatus.Up ⇒ m.address + } + + // create actor that subscribes to the cluster eventBus + private val eventBusListener: ActorRef = { + + // FIXME is this allowed, are we inside or outside of the actor? + context.actorOf(Props(new Actor { + override def preStart(): Unit = Cluster(context.system).subscribe(self, classOf[ClusterDomainEvent]) + override def postStop(): Unit = Cluster(context.system).unsubscribe(self) + + def receive = { + case s: CurrentClusterState ⇒ + upNodes = s.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } + + case MemberUp(m) ⇒ + upNodes += m.address + // FIXME Here we could trigger a rebalance, by counting number of routees per node and unregister + // routees from nodes with many routees and deploy on this new node instead + + case other: MemberEvent ⇒ + // other events means that it is no longer interesting, such as + // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved + upNodes -= other.member.address + + // FIXME Should we deploy new routees corresponding to the ones that goes away here? + // or is that a job for a special Cluster Resizer? + + } + + }), name = "cluster-listener") + } + +} + diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index dd880a76d8..bc583cb809 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -44,6 +44,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { publish-stats-interval = 0 s # always, when it happens } akka.loglevel = INFO + akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.actor.default-dispatcher.fork-join-executor { # when using nodes-per-datacenter=10 we need some extra # threads to keep up with netty connect blocking diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 93c1f921ae..72535bd4eb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -21,6 +21,7 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" + akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster { auto-join = on auto-down = off diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index e1b3571bd2..0a2872b186 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -24,6 +24,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { // Note that this test uses default configuration, // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" + akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster { auto-join = off } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala new file mode 100644 index 0000000000..80370278ec --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster.routing + +import language.postfixOps +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.PoisonPill +import akka.actor.Address +import scala.concurrent.Await +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.Broadcast +import akka.routing.RoundRobinRouter +import akka.routing.RoutedActorRef +import akka.testkit._ +import scala.concurrent.util.duration._ +import akka.cluster.MultiNodeClusterSpec +import com.typesafe.config.ConfigFactory +import akka.cluster.FailureDetectorPuppetStrategy +import akka.cluster.Cluster + +object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { + + class SomeActor extends Actor { + def receive = { + case "hit" ⇒ sender ! self + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.actor.deployment { + /service-hello { + router = round-robin + nr-of-instances = 3 + cluster = on + } + } + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterRoundRobinRoutedActorMultiJvmNode1 extends ClusterRoundRobinRoutedActorSpec with FailureDetectorPuppetStrategy +class ClusterRoundRobinRoutedActorMultiJvmNode2 extends ClusterRoundRobinRoutedActorSpec with FailureDetectorPuppetStrategy +class ClusterRoundRobinRoutedActorMultiJvmNode3 extends ClusterRoundRobinRoutedActorSpec with FailureDetectorPuppetStrategy +class ClusterRoundRobinRoutedActorMultiJvmNode4 extends ClusterRoundRobinRoutedActorSpec with FailureDetectorPuppetStrategy + +abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRoundRobinRoutedActorMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import ClusterRoundRobinRoutedActorMultiJvmSpec._ + + // sorted in the order used by the cluster + lazy val sortedRoles = Seq(first, second, third, fourth).sorted + + // FIXME make this use of Cluster(system) more easy to use in tests + override def cluster: Cluster = Cluster(system) + + "A cluster router configured with a RoundRobin router" must { + "start cluster" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth) + enterBarrier("after-1") + } + + "be locally instantiated on a cluster node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { + + runOn(sortedRoles.dropRight(1): _*) { + enterBarrier("start", "broadcast-end", "end") + } + + runOn(sortedRoles.last) { + enterBarrier("start") + val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + actor.isInstanceOf[RoutedActorRef] must be(true) + + val connectionCount = 3 + val iterationCount = 10 + + for (i ← 0 until iterationCount; k ← 0 until connectionCount) { + actor ! "hit" + } + + val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) { + case ref: ActorRef ⇒ ref.path.address + }).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) { + case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) + } + + enterBarrier("broadcast-end") + actor ! Broadcast(PoisonPill) + + enterBarrier("end") + replies.values foreach { _ must be(iterationCount) } + replies.get(node(fourth).address) must be(None) + + // shut down the actor before we let the other node(s) shut down so we don't try to send + // "Terminate" to a shut down node + system.stop(actor) + } + + enterBarrier("after-2") + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 908b7298fd..a2a55ac2de 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -12,7 +12,7 @@ import scala.concurrent.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class AccrualFailureDetectorSpec extends AkkaSpec(""" - actor.provider = "akka.remote.RemoteActorRefProvider" + actor.provider = "akka.cluster.ClusterActorRefProvider" akka.loglevel = "INFO" """) { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 73364b853e..3086339517 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -15,7 +15,6 @@ import akka.testkit.ImplicitSender import akka.actor.ExtendedActorSystem import akka.actor.Address import akka.cluster.InternalClusterAction._ -import akka.remote.RemoteActorRefProvider import java.lang.management.ManagementFactory import javax.management.ObjectName @@ -27,7 +26,7 @@ object ClusterSpec { periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks publish-stats-interval = 0 s # always, when it happens } - akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.netty.port = 0 # akka.loglevel = DEBUG """ @@ -39,7 +38,7 @@ object ClusterSpec { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { import ClusterSpec._ - val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address + val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.address val failureDetector = new FailureDetectorPuppet(system)