diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 3b9802d7fd..cdfd040ace 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -205,7 +205,7 @@ case class ConsistentHashingRouter( * that can't be defined in configuration. */ override def withFallback(other: RouterConfig): RouterConfig = other match { - case _: FromConfig ⇒ this + case _: FromConfig | _: NoRouter ⇒ this case otherRouter: ConsistentHashingRouter ⇒ val useResizer = if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 60e69c1984..fa6860a1a8 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -9,7 +9,8 @@ akka { cluster { # Initial contact points of the cluster. Nodes to join at startup if auto-join = on. - # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port" + # Comma separated full URIs defined by a string on the form of + # "akka://system@hostname:port" # Leave as empty if the node should be a singleton cluster. seed-nodes = [] @@ -20,11 +21,15 @@ akka { # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on - # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? - # Using auto-down implies that two separate clusters will automatically be formed in case of - # network partition. + # Should the 'leader' in the cluster be allowed to automatically mark unreachable + # nodes as DOWN? + # Using auto-down implies that two separate clusters will automatically be formed + # in case of network partition. auto-down = off + # Enable or disable JMX MBeans for management of the cluster + jmx.enabled = on + # how long should the node wait before starting the periodic tasks maintenance tasks? periodic-tasks-initial-delay = 1s @@ -37,18 +42,20 @@ akka { # how often should the leader perform maintenance tasks? leader-actions-interval = 1s - # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? + # how often should the node move nodes, marked as unreachable by the failure detector, + # out of the membership ring? unreachable-nodes-reaper-interval = 1s # How often the current internal stats should be published. # A value of 0 s can be used to always publish the stats, when it happens. publish-stats-interval = 10s - # A joining node stops sending heartbeats to the node to join if it hasn't become member - # of the cluster within this deadline. + # A joining node stops sending heartbeats to the node to join if it hasn't + # become member of the cluster within this deadline. join-timeout = 60s - # The id of the dispatcher to use for cluster actors. If not specified default dispatcher is used. + # The id of the dispatcher to use for cluster actors. If not specified + # default dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f924dc20de..48e30080be 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -158,8 +158,11 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { system.registerOnTermination(shutdown()) - private val clusterJmx = new ClusterJmx(this, log) - clusterJmx.createMBean() + private val clusterJmx: Option[ClusterJmx] = { + val jmx = new ClusterJmx(this, log) + jmx.createMBean() + Some(jmx) + } log.info("Cluster Node [{}] - has started up successfully", selfAddress) @@ -253,7 +256,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { scheduler.close() - clusterJmx.unregisterMBean() + clusterJmx foreach { _.unregisterMBean() } log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index ff23f7b12f..1747df4dbb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -94,9 +94,9 @@ object ClusterEvent { case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent /** - * Leader of the cluster members changed, and/or convergence status. + * Leader of the cluster members changed. Only published after convergence. */ - case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent + case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent /** * INTERNAL API @@ -150,7 +150,7 @@ object ClusterEvent { val convergenceEvents = if (convergenceChanged) Seq(ConvergenceChanged(newConvergence)) else Seq.empty val leaderEvents = - if (convergenceChanged || newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader, newConvergence)) + if (newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader)) else Seq.empty val newSeenBy = newGossip.seenBy @@ -159,7 +159,7 @@ object ClusterEvent { else Seq.empty memberEvents.toIndexedSeq ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++ - convergenceEvents ++ leaderEvents ++ seenEvents + leaderEvents ++ convergenceEvents ++ seenEvents } } @@ -174,6 +174,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto var latestGossip: Gossip = Gossip() + // Keep track of LeaderChanged event. Should not be published until + // convergence, and it should only be published when leader actually + // changed to another node. 3 states: + // - None: No LeaderChanged detected yet, nothing published yet + // - Some(Left): Stashed LeaderChanged to be published later, when convergence + // - Some(Right): Latest published LeaderChanged + var leaderChangedState: Option[Either[LeaderChanged, LeaderChanged]] = None + def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) @@ -210,11 +218,38 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto // keep the latestGossip to be sent to new subscribers latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ - eventStream publish event - // notify DeathWatch about unreachable node event match { - case MemberUnreachable(m) ⇒ eventStream publish AddressTerminated(m.address) - case _ ⇒ + case x @ LeaderChanged(_) if leaderChangedState == Some(Right(x)) ⇒ + // skip, this leader has already been published + + case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ + // leader changed and immediate convergence + leaderChangedState = Some(Right(x)) + eventStream publish x + + case x: LeaderChanged ⇒ + // publish later, when convergence + leaderChangedState = Some(Left(x)) + + case ConvergenceChanged(true) ⇒ + // now it's convergence, publish eventual stashed LeaderChanged event + leaderChangedState match { + case Some(Left(x)) ⇒ + leaderChangedState = Some(Right(x)) + eventStream publish x + + case _ ⇒ // nothing stashed + } + eventStream publish event + + case MemberUnreachable(m) ⇒ + eventStream publish event + // notify DeathWatch about unreachable node + eventStream publish AddressTerminated(m.address) + + case _ ⇒ + // all other events + eventStream publish event } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index ff827574d7..0aa9e6997e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -52,11 +52,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - event.member + event.member) - case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) - case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case _ ⇒ // ignore, not interesting + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e37d4abc72..4212e59c1c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -36,6 +36,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") + final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled") final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 9c6bb9cfa9..52a9a55e21 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -32,6 +32,7 @@ import akka.remote.routing.RemoteRouterConfig import akka.actor.RootActorPath import akka.actor.ActorCell import akka.actor.RelativeActorPath +import scala.annotation.tailrec /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -168,32 +169,43 @@ private[akka] class ClusterRouteeProvider( * to use for cluster routers. */ override def createRoutees(nrOfInstances: Int): Unit = { - for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { - val ref = - if (settings.isRouteesPathDefined) { - context.actorFor(RootActorPath(target) / settings.routeesPathElements) - } else { - val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) - context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) - } - // must register each one, since registered routees are used in selectDeploymentTarget - registerRoutees(Some(ref)) + @tailrec + def doCreateRoutees(): Unit = selectDeploymentTarget match { + case None ⇒ // done + case Some(target) ⇒ + val ref = + if (settings.isRouteesPathDefined) { + context.actorFor(RootActorPath(target) / settings.routeesPathElements) + } else { + val name = "c" + childNameCounter.incrementAndGet + val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig, + scope = RemoteScope(target)) + context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) + } + // must register each one, since registered routees are used in selectDeploymentTarget + registerRoutees(Some(ref)) + + // recursion until all created + doCreateRoutees() } + + doCreateRoutees() } private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances) private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees - if (currentRoutees.size >= settings.totalInstances) { + val currentNodes = availbleNodes + if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { None } else { + // find the node with least routees val numberOfRouteesPerNode: Map[Address, Int] = - Map.empty[Address, Int] ++ availbleNodes.toSeq.map(_ -> 0) ++ - currentRoutees.groupBy(fullAddress).map { - case (address, refs) ⇒ address -> refs.size - } + currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefault(_ ⇒ 0)) { (acc, x) ⇒ + val address = fullAddress(x) + acc + (address -> (acc(address) + 1)) + } val (address, count) = numberOfRouteesPerNode.minBy(_._2) if (count < settings.maxInstancesPerNode) Some(address) else None diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index ea35249303..737d6be549 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -53,6 +53,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { parallelism-max = 13 } akka.scheduler.tick-duration = 33 ms + akka.remote.log-remote-lifecycle-events = off akka.remote.netty.execution-pool-size = 4 #akka.remote.netty.reconnection-time-window = 1s akka.remote.netty.backoff-timeout = 500ms diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 489c5415ea..f38d80ace5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -33,6 +33,7 @@ object MultiNodeClusterSpec { akka.cluster { auto-join = on auto-down = off + jmx.enabled = off gossip-interval = 200 ms heartbeat-interval = 400 ms leader-actions-interval = 200 ms diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 768546a532..40f46ffbbc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -29,6 +29,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { auto-join = off } akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off """)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala new file mode 100644 index 0000000000..c39edd8a13 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster.routing + +import scala.concurrent.Await +import scala.concurrent.util.duration._ + +import com.typesafe.config.ConfigFactory + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.cluster.MultiNodeClusterSpec +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.ConsistentHashingRouter +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.routing.CurrentRoutees +import akka.routing.FromConfig +import akka.routing.RouterRoutees +import akka.testkit._ + +object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig { + + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + common-router-settings = { + router = consistent-hashing + nr-of-instances = 10 + cluster { + enabled = on + max-nr-of-instances-per-node = 2 + } + } + + akka.actor.deployment { + /router1 = ${common-router-settings} + /router3 = ${common-router-settings} + /router4 = ${common-router-settings} + } + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterConsistentHashingRouterMultiJvmNode1 extends ClusterConsistentHashingRouterSpec +class ClusterConsistentHashingRouterMultiJvmNode2 extends ClusterConsistentHashingRouterSpec +class ClusterConsistentHashingRouterMultiJvmNode3 extends ClusterConsistentHashingRouterSpec + +abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterConsistentHashingRouterMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import ClusterConsistentHashingRouterMultiJvmSpec._ + + lazy val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1") + + def currentRoutees(router: ActorRef) = + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees + + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + "A cluster router with a consistent hashing router" must { + "start cluster with 2 nodes" taggedAs LongRunningTest in { + awaitClusterUp(first, second) + enterBarrier("after-1") + } + + "create routees from configuration" in { + runOn(first) { + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router1).size == 4 + } + currentRoutees(router1).map(fullAddress).toSet must be(Set(address(first), address(second))) + } + enterBarrier("after-2") + } + + "select destination based on hashKey" in { + runOn(first) { + router1 ! ConsistentHashableEnvelope(message = "A", hashKey = "a") + val destinationA = expectMsgType[ActorRef] + router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") + expectMsg(destinationA) + } + enterBarrier("after-2") + } + + "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in { + + awaitClusterUp(first, second, third) + + runOn(first) { + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router1).size == 6 + } + currentRoutees(router1).map(fullAddress).toSet must be(roles.map(address).toSet) + } + + enterBarrier("after-3") + } + + "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { + runOn(first) { + val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(), + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2))), "router2") + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router2).size == 6 + } + currentRoutees(router2).map(fullAddress).toSet must be(roles.map(address).toSet) + } + + enterBarrier("after-4") + } + + "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in { + runOn(first) { + def hashMapping: ConsistentHashMapping = { + case s: String ⇒ s + } + + val router3 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(hashMapping = hashMapping)), "router3") + + assertHashMapping(router3) + } + + enterBarrier("after-5") + } + + "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in { + runOn(first) { + def hashMapping: ConsistentHashMapping = { + case s: String ⇒ s + } + + val router4 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig( + local = ConsistentHashingRouter(hashMapping = hashMapping), + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), "router4") + + assertHashMapping(router4) + } + + enterBarrier("after-6") + } + + def assertHashMapping(router: ActorRef): Unit = { + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router).size == 6 + } + currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet) + + router ! "a" + val destinationA = expectMsgType[ActorRef] + router ! "a" + expectMsg(destinationA) + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 2d7565f5f5..f8c5571a57 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -35,6 +35,7 @@ class ClusterConfigSpec extends AkkaSpec { JoinTimeout must be(60 seconds) AutoJoin must be(true) AutoDown must be(false) + JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala new file mode 100644 index 0000000000..c29f237be7 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import language.postfixOps +import scala.collection.immutable.SortedSet +import scala.concurrent.util.duration._ +import org.scalatest.BeforeAndAfterEach +import akka.actor.Address +import akka.actor.Props +import akka.cluster.MemberStatus._ +import akka.cluster.InternalClusterAction._ +import akka.cluster.ClusterEvent._ +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.actor.ActorRef + +object ClusterDomainEventPublisherSpec { + val config = """ + akka.cluster.auto-join = off + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.remote.netty.port = 0 + """ + + case class GossipTo(address: Address) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) + with BeforeAndAfterEach with ImplicitSender { + import ClusterDomainEventPublisherSpec._ + + var publisher: ActorRef = _ + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val c1 = Member(Address("akka", "sys", "c", 2552), Joining) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "a", 2551), Up) + + val g0 = Gossip(members = SortedSet(a1)).seen(a1.address) + val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address) + val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address) + val g3 = g2.seen(b1.address).seen(c2.address) + val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) + val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) + + override def beforeEach(): Unit = { + publisher = system.actorOf(Props[ClusterDomainEventPublisher]) + publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) + expectMsgType[CurrentClusterState] + } + + override def afterEach(): Unit = { + publisher ! Unsubscribe(testActor) + system.stop(publisher) + } + + "ClusterDomainEventPublisher" must { + + "publish MemberUp when member status changed to Up" in { + publisher ! PublishChanges(g1, g2) + expectMsg(MemberUp(c2)) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + } + + "publish convergence true when all seen it" in { + publisher ! PublishChanges(g2, g3) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + + "publish leader changed when new leader after convergence" in { + publisher ! PublishChanges(g3, g4) + expectMsg(MemberUp(d1)) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + expectNoMsg(1 second) + + publisher ! PublishChanges(g4, g5) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + + "publish leader changed when new leader and convergence both before and after" in { + // convergence both before and after + publisher ! PublishChanges(g3, g5) + expectMsg(MemberUp(d1)) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsgType[SeenChanged] + } + + "not publish leader changed when not convergence" in { + publisher ! PublishChanges(g2, g4) + expectMsg(MemberUp(d1)) + expectNoMsg(1 second) + } + + "not publish leader changed when changed convergence but still same leader" in { + publisher ! PublishChanges(g2, g5) + expectMsg(MemberUp(d1)) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g5, g4) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g4, g5) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 1bbffca3c2..3a4e3ee3a4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -69,10 +69,9 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) - // LeaderChanged is also published when convergence changed - diff(g1, g2) must be(Seq(ConvergenceChanged(false), LeaderChanged(Some(a1.address), convergence = false), + diff(g1, g2) must be(Seq(ConvergenceChanged(false), SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) - diff(g2, g1) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(a1.address), convergence = true), + diff(g2, g1) must be(Seq(ConvergenceChanged(true), SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) } @@ -81,8 +80,8 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(b1, e1), overview = GossipOverview(unreachable = Set(a1))) val g3 = g2.copy(overview = GossipOverview()).seen(b1.address).seen(e1.address) - diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address), convergence = false))) - diff(g2, g3) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(b1.address), convergence = true), + diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address)))) + diff(g2, g3) must be(Seq(ConvergenceChanged(true), SeenChanged(convergence = true, seenBy = Set(b1.address, e1.address)))) } } diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst index 9d922aea97..b0ec8f08b7 100644 --- a/akka-docs/cluster/cluster-usage.rst +++ b/akka-docs/cluster/cluster-usage.rst @@ -18,7 +18,8 @@ The Akka cluster is a separate jar file. Make sure that you have the following d "com.typesafe.akka" %% "akka-cluster" % "2.1-SNAPSHOT" -If you are using the latest nightly build you should pick a timestamped Akka version from ``_. +If you are using the latest nightly build you should pick a timestamped Akka version from ``_. Don't use ``SNAPSHOT``. Note that the Scala version |scalaVersion| +is part of the artifactId. A Simple Cluster Example ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -35,7 +36,9 @@ Try it out: :language: none To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala` -settings and the ``akka.cluster.seed-nodes`` to your ``application.conf`` file. +settings, but with ``akka.cluster.ClusterActorRefProvider``. +The ``akka.cluster.seed-nodes`` and cluster extension should normally also be added to your +``application.conf`` file. The seed nodes are configured contact points for initial, automatic, join of the cluster. @@ -44,20 +47,20 @@ ip-addresses or host names of the machines in ``application.conf`` instead of `` 2. Add the following main program to your project, place it in ``src/main/scala``: -.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala +.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala :language: scala 3. Start the first seed node. Open a sbt session in one terminal window and run:: - run-main sample.cluster.ClusterApp 2551 + run-main sample.cluster.simple.SimpleClusterApp 2551 2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'. 4. Start the second seed node. Open a sbt session in another terminal window and run:: - run-main sample.cluster.ClusterApp 2552 + run-main sample.cluster.simple.SimpleClusterApp 2552 2552 corresponds to the port of the second seed-nodes element in the configuration. @@ -68,7 +71,7 @@ Switch over to the first terminal window and see in the log output that the memb 5. Start another node. Open a sbt session in yet another terminal window and run:: - run-main sample.cluster.ClusterApp + run-main sample.cluster.simple.SimpleClusterApp Now you don't need to specify the port number, and it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal @@ -82,7 +85,7 @@ output in the other terminals. Look at the source code of the program again. What it does is to create an actor and register it as subscriber of certain cluster events. It gets notified with -an snapshot event, 'CurrentClusterState' that holds full state information of +an snapshot event, ``CurrentClusterState`` that holds full state information of the cluster. After that it receives events for changes that happen in the cluster. Automatic vs. Manual Joining @@ -110,7 +113,7 @@ You can disable automatic joining with configuration: akka.cluster.auto-join = off -Then you need to join manually, using JMX or the provided script. +Then you need to join manually, using :ref:`cluster_jmx` or :ref:`cluster_command_line`. You can join to any node in the cluster. It doesn't have to be configured as seed node. If you are not using auto-join there is no need to configure seed nodes at all. @@ -125,7 +128,8 @@ When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of new joining members to 'Up'. The status of the unreachable member must be changed to 'Down'. This can be performed automatically or manually. By -default it must be done manually, using using JMX or the provided script. +default it must be done manually, using using :ref:`cluster_jmx` or +:ref:`cluster_command_line`. It can also be performed programatically with ``Cluster(system).down``. @@ -137,6 +141,265 @@ Be aware of that using auto-down implies that two separate clusters will automatically be formed in case of network partition. That might be desired by some applications but not by others. +Subscribe to Cluster Events +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can subscribe to change notifications of the cluster membership by using +``Cluster(system).subscribe``. A snapshot of the full state, +``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber +as the first event, followed by events for incremental updates. + +There are several types of change events, consult the API documentation +of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` +for details about the events. + +Worker Dial-in Example +---------------------- + +Let's take a look at an example that illustrates how workers, here named *backend*, +can detect and register to new master nodes, here named *frontend*. + +The example application provides a service to transform text. When some text +is sent to one of the frontend services, it will be delegated to one of the +backend workers, which performs the transformation job, and sends the result back to +the original client. New backend nodes, as well as new frontend nodes, can be +added or removed to the cluster dynamically. + +In this example the following imports are used: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#imports + +Messages: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#messages + +The backend worker that performs the transformation job: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#backend + +Note that the ``TransformationBackend`` actor subscribes to cluster events to detect new, +potential, frontend nodes, and send them a registration message so that they know +that they can use the backend worker. + +The frontend that receives user jobs and delegates to one of the registered backend workers: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#frontend + +Note that the ``TransformationFrontend`` actor watch the registered backend +to be able to remove it from its list of availble backend workers. +Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects +network failures and JVM crashes, in addition to graceful termination of watched +actor. + +This example is included in ``akka-samples/akka-sample-cluster`` +and you can try by starting nodes in different terminal windows. For example, starting 2 +frontend nodes and 3 backend nodes:: + + sbt + + project akka-sample-cluster-experimental + + run-main sample.cluster.transformation.TransformationFrontend 2551 + + run-main sample.cluster.transformation.TransformationBackend 2552 + + run-main sample.cluster.transformation.TransformationBackend + + run-main sample.cluster.transformation.TransformationBackend + + run-main sample.cluster.transformation.TransformationFrontend + + +.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters `_. + +Cluster Aware Routers +^^^^^^^^^^^^^^^^^^^^^ + +All :ref:`routers ` can be made aware of member nodes in the cluster, i.e. +deploying new routees or looking up routees on nodes in the cluster. +When a node becomes unavailble or leaves the cluster the routees of that node are +automatically unregistered from the router. When new nodes join the cluster additional +routees are added to the router, according to the configuration. + +When using a router with routees looked up on the cluster member nodes, i.e. the routees +are already running, the configuration for a router looks like this: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config + +It's the relative actor path defined in ``routees-path`` that identify what actor to lookup. + +``nr-of-instances`` defines total number of routees in the cluster, but there will not be +more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees +added to the router when nodes join the cluster. + +The same type of router could also have been defined in code: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-lookup-in-code + +When using a router with routees created and deployed on the cluster member nodes +the configuration for a router looks like this: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config + + +``nr-of-instances`` defines total number of routees in the cluster, but the number of routees +per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances`` +to a high value will result in creating and deploying additional routees when new nodes join +the cluster. + +The same type of router could also have been defined in code: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-deploy-in-code + +See :ref:`cluster_configuration` section for further descriptions of the settings. + + +Router Example +-------------- + +Let's take a look at how to use cluster aware routers. + +The example application provides a service to calculate statistics for a text. +When some text is sent to the service it splits it into words, and delegates the task +to count number of characters in each word to a separate worker, a routee of a router. +The character count for each word is sent back to an aggregator that calculates +the average number of characters per word when all results have been collected. + +In this example we use the following imports: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#imports + +Messages: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#messages + +The worker that counts number of characters in each word: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#worker + +The service that receives text from users and splits it up into words, delegates to workers and aggregates: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#service + + +Note, nothing cluster specific so far, just plain actors. + +We can use these actors with two different types of router setup. Either with lookup of routees, +or with create and deploy of routees. Remember, routees are the workers in this case. + +We start with the router setup with lookup of routees. All nodes start ``StatsService`` and +``StatsWorker`` actors and the router is configured with ``routees-path``: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-lookup + +This means that user requests can be sent to ``StatsService`` on any node and it will use +``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily +fan out to local children if more parallelism is needed. + +This example is included in ``akka-samples/akka-sample-cluster`` +and you can try by starting nodes in different terminal windows. For example, starting 3 +service nodes and 1 client:: + + run-main sample.cluster.stats.StatsSample 2551 + + run-main sample.cluster.stats.StatsSample 2552 + + run-main sample.cluster.stats.StatsSampleClient + + run-main sample.cluster.stats.StatsSample + +The above setup is nice for this example, but we will also take a look at how to use +a single master node that creates and deploys workers. To keep track of a single +master we need one additional actor: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade + +The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single +master. It listens to cluster events to create or lookup the ``StatsService`` depending on if +it is on the same same node or on another node. We run the master on the same node as the leader of +the cluster members, which is nothing more than the address currently sorted first in the member ring, +i.e. it can change when new nodes join or when current leader leaves. + +All nodes start ``StatsFacade`` and the router is now configured like this: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy + + +This example is included in ``akka-samples/akka-sample-cluster`` +and you can try by starting nodes in different terminal windows. For example, starting 3 +service nodes and 1 client:: + + run-main sample.cluster.stats.StatsSampleOneMaster 2551 + + run-main sample.cluster.stats.StatsSampleOneMaster 2552 + + run-main sample.cluster.stats.StatsSampleOneMasterClient + + run-main sample.cluster.stats.StatsSampleOneMaster + +.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. + +.. _cluster_jmx: + +JMX +^^^ + +Information and management of the cluster is available as JMX MBeans with the root name ``akka.Cluster``. +The JMX information can be displayed with an ordinary JMX console such as JConsole or JVisualVM. + +From JMX you can: + +* see what members that are part of the cluster +* see status of this node +* join this node to another node in cluster +* mark any node in the cluster as down +* tell any node in the cluster to leave + +Member nodes are identified with their address, in format `akka://actor-system-name@hostname:port`. + +.. _cluster_command_line: + +Command Line Management +^^^^^^^^^^^^^^^^^^^^^^^ + +The cluster can be managed with the script `bin/akka-cluster` provided in the +Akka distribution. + +Run it without parameters to see instructions about how to use the script:: + + Usage: bin/akka-cluster ... + + Supported commands are: + join - Sends request a JOIN node with the specified URL + leave - Sends a request for node with URL to LEAVE the cluster + down - Sends a request for marking node with URL as DOWN + member-status - Asks the member node for its current status + cluster-status - Asks the cluster for its current status (member ring, + unavailable nodes, meta data etc.) + leader - Asks the cluster who the current leader is + is-singleton - Checks if the cluster is a singleton cluster (single + node cluster) + is-available - Checks if the member node is available + is-running - Checks if the member node is running + has-convergence - Checks if there is a cluster convergence + Where the should be on the format of 'akka://actor-system-name@hostname:port' + + Examples: bin/akka-cluster localhost:9999 is-available + bin/akka-cluster localhost:9999 join akka://MySystem@darkstar:2552 + bin/akka-cluster localhost:9999 cluster-status + + +To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, +as described in `Monitoring and Management Using JMX Technology `_ + +Example of system properties to enable remote monitoring and management:: + + java -Dcom.sun.management.jmxremote.port=9999 \ + -Dcom.sun.management.jmxremote.authenticate=false \ + -Dcom.sun.management.jmxremote.ssl=false + +.. _cluster_configuration: + Configuration ^^^^^^^^^^^^^ @@ -147,6 +410,9 @@ reference file for more information: .. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf :language: none +Cluster Scheduler +----------------- + It is recommended that you change the ``tick-duration`` to 33 ms or less of the default scheduler when using cluster, if you don't need to have it configured to a longer duration for other reasons. If you don't do this diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster index fe3af38449..0cbff520dd 100755 --- a/akka-kernel/src/main/dist/bin/akka-cluster +++ b/akka-kernel/src/main/dist/bin/akka-cluster @@ -27,7 +27,7 @@ HOST=$1 # cluster node:port to talk to through JMX function ensureNodeIsRunningAndAvailable { REPLY=$($JMX_CLIENT $HOST akka:type=Cluster Available 2>&1 >/dev/null) # redirects STDERR to STDOUT before capturing it if [[ "$REPLY" != *true ]]; then - echo "Akka cluster node is not available on $HOST" + echo "Akka cluster node is not available on $HOST, due to $REPLY" exit 1 fi } @@ -37,7 +37,7 @@ case "$2" in join) if [ $# -ne 3 ]; then - echo "Usage: $SELF join " + echo "Usage: $SELF join " exit 1 fi @@ -51,7 +51,7 @@ case "$2" in leave) if [ $# -ne 3 ]; then - echo "Usage: $SELF leave " + echo "Usage: $SELF leave " exit 1 fi @@ -65,7 +65,7 @@ case "$2" in down) if [ $# -ne 3 ]; then - echo "Usage: $SELF down " + echo "Usage: $SELF down " exit 1 fi @@ -164,7 +164,7 @@ case "$2" in ensureNodeIsRunningAndAvailable shift - echo "Checking if member node on $HOST is AVAILABLE" + echo "Checking if member node on $HOST is RUNNING" $JMX_CLIENT $HOST akka:type=Cluster Running ;; @@ -172,17 +172,17 @@ case "$2" in printf "Usage: bin/$SELF ...\n" printf "\n" printf "Supported commands are:\n" - printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL" - printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster" - printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN" - printf "%26s - %s\n" member-status "Asks the member node for its current status" - printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)" - printf "%26s - %s\n" leader "Asks the cluster who the current leader is" - printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)" - printf "%26s - %s\n" is-available "Checks if the member node is available" - printf "%26s - %s\n" is-running "Checks if the member node is running" - printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence" - printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n" + printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL" + printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster" + printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN" + printf "%26s - %s\n" member-status "Asks the member node for its current status" + printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)" + printf "%26s - %s\n" leader "Asks the cluster who the current leader is" + printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)" + printf "%26s - %s\n" is-available "Checks if the member node is available" + printf "%26s - %s\n" is-running "Checks if the member node is running" + printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence" + printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n" printf "\n" printf "Examples: bin/$SELF localhost:9999 is-available\n" printf " bin/$SELF localhost:9999 join akka://MySystem@darkstar:2552\n" diff --git a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala index c22ff7dfa8..8a4e3bce7c 100644 --- a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala @@ -79,7 +79,8 @@ final class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContex override def createRoutees(nrOfInstances: Int): Unit = { val refs = IndexedSeq.fill(nrOfInstances) { val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next)) + val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig, + scope = RemoteScope(nodeAddressIter.next)) // attachChild means that the provider will treat this call as if possibly done out of the wrong // context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf index a54c87510c..62554a65cf 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/application.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -1,9 +1,10 @@ akka { actor { - provider = "akka.remote.RemoteActorRefProvider" + provider = "akka.cluster.ClusterActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" + log-remote-lifecycle-events = off netty { hostname = "127.0.0.1" port = 0 @@ -16,5 +17,7 @@ akka { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552"] + + auto-down = on } } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala similarity index 94% rename from akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala rename to akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala index 0fd396784d..4f69700835 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala @@ -1,10 +1,10 @@ -package sample.cluster +package sample.cluster.simple import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -object ClusterApp { +object SimpleClusterApp { def main(args: Array[String]): Unit = { diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala new file mode 100644 index 0000000000..8a847d0b05 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -0,0 +1,262 @@ +package sample.cluster.stats + +//#imports +import language.postfixOps +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.RelativeActorPath +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.LeaderChanged +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.MemberStatus +import akka.routing.FromConfig +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +//#imports + +//#messages +case class StatsJob(text: String) +case class StatsResult(meanWordLength: Double) +case class JobFailed(reason: String) +//#messages + +//#service +class StatsService extends Actor { + val workerRouter = context.actorOf(Props[StatsWorker].withRouter(FromConfig), + name = "workerRouter") + + def receive = { + case StatsJob(text) if text != "" ⇒ + val words = text.split(" ") + val replyTo = sender // important to not close over sender + val aggregator = context.actorOf(Props(new StatsAggregator(words.size, replyTo))) + words foreach { word ⇒ + workerRouter.tell( + ConsistentHashableEnvelope(word, word), aggregator) + } + } +} + +class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor { + var results = IndexedSeq.empty[Int] + context.setReceiveTimeout(10 seconds) + + def receive = { + case wordCount: Int ⇒ + results = results :+ wordCount + if (results.size == expectedResults) { + val meanWordLength = results.sum.toDouble / results.size + replyTo ! StatsResult(meanWordLength) + context.stop(self) + } + case ReceiveTimeout ⇒ + replyTo ! JobFailed("Service unavailable, try again later") + context.stop(self) + } +} +//#service + +//#worker +class StatsWorker extends Actor { + var cache = Map.empty[String, Int] + def receive = { + case word: String ⇒ + val length = cache.get(word) match { + case Some(x) ⇒ x + case None ⇒ + val x = word.length + cache += (word -> x) + x + } + + sender ! length + } +} +//#worker + +//#facade +class StatsFacade extends Actor with ActorLogging { + val cluster = Cluster(context.system) + + var currentMaster: Option[ActorRef] = None + var currentMasterCreatedByMe = false + + // subscribe to cluster changes, LeaderChanged + // re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case job: StatsJob if currentMaster.isEmpty ⇒ + sender ! JobFailed("Service unavailable, try again later") + case job: StatsJob ⇒ + currentMaster foreach { _ forward job } + case state: CurrentClusterState ⇒ + state.leader foreach updateCurrentMaster + case LeaderChanged(Some(leaderAddress)) ⇒ + updateCurrentMaster(leaderAddress) + } + + def updateCurrentMaster(leaderAddress: Address): Unit = { + if (leaderAddress == cluster.selfAddress) { + if (!currentMasterCreatedByMe) { + log.info("Creating new statsService master at [{}]", leaderAddress) + currentMaster = Some(context.actorOf(Props[StatsService], name = "statsService")) + currentMasterCreatedByMe = true + } + } else { + if (currentMasterCreatedByMe) + currentMaster foreach { context.stop(_) } + log.info("Using statsService master at [{}]", leaderAddress) + currentMaster = Some(context.actorFor( + context.self.path.toStringWithAddress(leaderAddress) + "/statsService")) + currentMasterCreatedByMe = false + } + } + +} +//#facade + +object StatsSample { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + //#start-router-lookup + val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" + akka.actor.deployment { + /statsService/workerRouter { + router = consistent-hashing + nr-of-instances = 100 + cluster { + enabled = on + routees-path = "/user/statsWorker" + allow-local-routees = on + } + } + } + """).withFallback(ConfigFactory.load())) + + system.actorOf(Props[StatsWorker], name = "statsWorker") + system.actorOf(Props[StatsService], name = "statsService") + //#start-router-lookup + + } +} + +object StatsSampleOneMaster { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + //#start-router-deploy + val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" + akka.actor.deployment { + /statsFacade/statsService/workerRouter { + router = consistent-hashing + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 3 + allow-local-routees = off + } + } + } + """).withFallback(ConfigFactory.load())) + //#start-router-deploy + + system.actorOf(Props[StatsFacade], name = "statsFacade") + } +} + +object StatsSampleClient { + def main(args: Array[String]): Unit = { + val system = ActorSystem("ClusterSystem") + system.actorOf(Props(new StatsSampleClient("/user/statsService")), "client") + } +} + +object StatsSampleOneMasterClient { + def main(args: Array[String]): Unit = { + val system = ActorSystem("ClusterSystem") + system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client") + } +} + +class StatsSampleClient(servicePath: String) extends Actor { + val cluster = Cluster(context.system) + val servicePathElements = servicePath match { + case RelativeActorPath(elements) ⇒ elements + case _ ⇒ throw new IllegalArgumentException( + "servicePath [%s] is not a valid relative actor path" format servicePath) + } + import context.dispatcher + val tickTask = context.system.scheduler.schedule(2 seconds, 2 seconds, self, "tick") + + var nodes = Set.empty[Address] + + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def postStop(): Unit = { + cluster.unsubscribe(self) + tickTask.cancel() + } + + def receive = { + case "tick" if nodes.nonEmpty ⇒ + // just pick any one + val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size)) + val service = context.actorFor(RootActorPath(address) / servicePathElements) + service ! StatsJob("this is the text that will be analyzed") + case result: StatsResult ⇒ + println(result) + case failed: JobFailed ⇒ + println(failed) + case state: CurrentClusterState ⇒ + nodes = state.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } + case MemberUp(m) ⇒ nodes += m.address + case other: MemberEvent ⇒ nodes -= other.member.address + } + +} + +// not used, only for documentation +abstract class StatsService2 extends Actor { + //#router-lookup-in-code + import akka.cluster.routing.ClusterRouterConfig + import akka.cluster.routing.ClusterRouterSettings + import akka.routing.ConsistentHashingRouter + + val workerRouter = context.actorOf(Props[StatsWorker].withRouter( + ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( + totalInstances = 100, routeesPath = "/user/statsWorker", + allowLocalRoutees = true))), + name = "workerRouter2") + //#router-lookup-in-code +} + +// not used, only for documentation +abstract class StatsService3 extends Actor { + //#router-deploy-in-code + import akka.cluster.routing.ClusterRouterConfig + import akka.cluster.routing.ClusterRouterSettings + import akka.routing.ConsistentHashingRouter + + val workerRouter = context.actorOf(Props[StatsWorker].withRouter( + ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( + totalInstances = 100, maxInstancesPerNode = 3, + allowLocalRoutees = false))), + name = "workerRouter3") + //#router-deploy-in-code +} diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala new file mode 100644 index 0000000000..e26bcd245d --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala @@ -0,0 +1,108 @@ +package sample.cluster.transformation + +//#imports +import language.postfixOps +import scala.concurrent.util.duration._ + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.Terminated +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.pattern.ask +import akka.util.Timeout +//#imports + +//#messages +case class TransformationJob(text: String) +case class TransformationResult(text: String) +case class JobFailed(reason: String, job: TransformationJob) +case object BackendRegistration +//#messages + +object TransformationFrontend { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + val system = ActorSystem("ClusterSystem") + val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") + + import system.dispatcher + implicit val timeout = Timeout(5 seconds) + for (n ← 1 to 120) { + (frontend ? TransformationJob("hello-" + n)) onSuccess { + case result ⇒ println(result) + } + Thread.sleep(2000) + } + system.shutdown() + } +} + +//#frontend +class TransformationFrontend extends Actor { + + var backends = IndexedSeq.empty[ActorRef] + var jobCounter = 0 + + def receive = { + case job: TransformationJob if backends.isEmpty ⇒ + sender ! JobFailed("Service unavailable, try again later", job) + + case job: TransformationJob ⇒ + jobCounter += 1 + backends(jobCounter % backends.size) forward job + + case BackendRegistration if !backends.contains(sender) ⇒ + context watch sender + backends = backends :+ sender + + case Terminated(a) ⇒ backends.filterNot(_ == a) + } +} +//#frontend + +object TransformationBackend { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + val system = ActorSystem("ClusterSystem") + system.actorOf(Props[TransformationBackend], name = "backend") + } +} + +//#backend +class TransformationBackend extends Actor { + + val cluster = Cluster(context.system) + + // subscribe to cluster changes, MemberEvent + // re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case TransformationJob(text) ⇒ sender ! TransformationResult(text.toUpperCase) + case state: CurrentClusterState ⇒ + state.members.filter(_.status == MemberStatus.Up) foreach register + case MemberUp(m) ⇒ register(m) + } + + // try to register to all nodes, even though there + // might not be any frontend on all nodes + def register(member: Member): Unit = + context.actorFor(RootActorPath(member.address) / "user" / "frontend") ! + BackendRegistration +} +//#backend \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala new file mode 100644 index 0000000000..b1d27cd7a3 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -0,0 +1,101 @@ +package sample.cluster.stats + +import language.postfixOps +import scala.concurrent.util.duration._ + +import com.typesafe.config.ConfigFactory + +import StatsSampleSpec.first +import StatsSampleSpec.third +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit.ImplicitSender + +object StatsSampleSingleMasterSpec extends MultiNodeConfig { + // register the named roles (nodes) of the test + val first = role("first") + val second = role("second") + val third = role("thrid") + + // this configuration will be used for all nodes + // note that no fixed host names and ports are used + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + #//#router-deploy-config + akka.actor.deployment { + /statsFacade/statsService/workerRouter { + router = consistent-hashing + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 3 + allow-local-routees = off + } + } + } + #//#router-deploy-config + """)) + +} + +// need one concrete test class per node +class StatsSampleSingleMasterMultiJvmNode1 extends StatsSampleSingleMasterSpec +class StatsSampleSingleMasterMultiJvmNode2 extends StatsSampleSingleMasterSpec +class StatsSampleSingleMasterMultiJvmNode3 extends StatsSampleSingleMasterSpec + +abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSingleMasterSpec) + with STMultiNodeSpec with ImplicitSender { + + import StatsSampleSpec._ + + override def initialParticipants = roles.size + + "The stats sample with single master" must { + "illustrate how to startup cluster" in within(10 seconds) { + Cluster(system).subscribe(testActor, classOf[MemberUp]) + expectMsgClass(classOf[CurrentClusterState]) + + Cluster(system) join node(first).address + system.actorOf(Props[StatsFacade], "statsFacade") + + expectMsgAllOf( + MemberUp(Member(node(first).address, MemberStatus.Up)), + MemberUp(Member(node(second).address, MemberStatus.Up)), + MemberUp(Member(node(third).address, MemberStatus.Up))) + + Cluster(system).unsubscribe(testActor) + + testConductor.enter("all-up") + } + + "show usage of the statsFacade" in within(5 seconds) { + val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") + + // eventually the service should be ok, + // worker nodes might not be up yet + awaitCond { + facade ! StatsJob("this is the text that will be analyzed") + expectMsgPF() { + case unavailble: JobFailed ⇒ false + case StatsResult(meanWordLength) ⇒ + meanWordLength must be(3.875 plusOrMinus 0.001) + true + } + } + + testConductor.enter("done") + } + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala new file mode 100644 index 0000000000..9f88597051 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -0,0 +1,92 @@ +package sample.cluster.stats + +import language.postfixOps +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory + +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit.ImplicitSender + +object StatsSampleSpec extends MultiNodeConfig { + // register the named roles (nodes) of the test + val first = role("first") + val second = role("second") + val third = role("thrid") + + // this configuration will be used for all nodes + // note that no fixed host names and ports are used + commonConfig(ConfigFactory.parseString(""" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + #//#router-lookup-config + akka.actor.deployment { + /statsService/workerRouter { + router = consistent-hashing + nr-of-instances = 100 + cluster { + enabled = on + routees-path = "/user/statsWorker" + allow-local-routees = on + } + } + } + #//#router-lookup-config + """)) + +} + +// need one concrete test class per node +class StatsSampleMultiJvmNode1 extends StatsSampleSpec +class StatsSampleMultiJvmNode2 extends StatsSampleSpec +class StatsSampleMultiJvmNode3 extends StatsSampleSpec + +abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpec) + with STMultiNodeSpec with ImplicitSender { + + import StatsSampleSpec._ + + override def initialParticipants = roles.size + + "The stats sample" must { + "illustrate how to startup cluster" in within(10 seconds) { + Cluster(system).subscribe(testActor, classOf[MemberUp]) + expectMsgClass(classOf[CurrentClusterState]) + + Cluster(system) join node(first).address + system.actorOf(Props[StatsWorker], "statsWorker") + system.actorOf(Props[StatsService], "statsService") + + expectMsgAllOf( + MemberUp(Member(node(first).address, MemberStatus.Up)), + MemberUp(Member(node(second).address, MemberStatus.Up)), + MemberUp(Member(node(third).address, MemberStatus.Up))) + + Cluster(system).unsubscribe(testActor) + + testConductor.enter("all-up") + } + + "show usage of the statsService" in within(5 seconds) { + + val service = system.actorFor(RootActorPath(node(third).address) / "user" / "statsService") + service ! StatsJob("this is the text that will be analyzed") + val meanWordLength = expectMsgPF() { + case StatsResult(meanWordLength) ⇒ meanWordLength + } + meanWordLength must be(3.875 plusOrMinus 0.001) + + testConductor.enter("done") + } + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala new file mode 100644 index 0000000000..98b4cb6526 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala @@ -0,0 +1,116 @@ +package sample.cluster.transformation + +import language.postfixOps +import scala.concurrent.util.duration._ + +import com.typesafe.config.ConfigFactory + +import akka.actor.Props +import akka.cluster.Cluster +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit.ImplicitSender + +object TransformationSampleSpec extends MultiNodeConfig { + // register the named roles (nodes) of the test + val frontend1 = role("frontend1") + val frontend2 = role("frontend2") + val backend1 = role("backend1") + val backend2 = role("backend2") + val backend3 = role("backend3") + + // this configuration will be used for all nodes + // note that no fixed host names and ports are used + commonConfig(ConfigFactory.parseString(""" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + """)) + +} + +// need one concrete test class per node +class TransformationSampleMultiJvmNode1 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode2 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode3 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode4 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode5 extends TransformationSampleSpec + +abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSampleSpec) + with STMultiNodeSpec with ImplicitSender { + + import TransformationSampleSpec._ + + override def initialParticipants = roles.size + + "The transformation sample" must { + "illustrate how to start first frontend" in { + runOn(frontend1) { + // this will only run on the 'first' node + Cluster(system) join node(frontend1).address + val transformationFrontend = system.actorOf(Props[TransformationFrontend], name = "frontend") + transformationFrontend ! TransformationJob("hello") + expectMsgPF() { + // no backends yet, service unavailble + case JobFailed(_, TransformationJob("hello")) ⇒ + } + } + + // this will run on all nodes + // use barrier to coordinate test steps + testConductor.enter("frontend1-started") + } + + "illustrate how a backend automatically registers" in within(15 seconds) { + runOn(backend1) { + Cluster(system) join node(frontend1).address + system.actorOf(Props[TransformationBackend], name = "backend") + } + testConductor.enter("backend1-started") + + runOn(frontend1) { + assertServiceOk + } + + testConductor.enter("frontend1-backend1-ok") + } + + "illustrate how more nodes registers" in within(15 seconds) { + runOn(frontend2) { + Cluster(system) join node(frontend1).address + system.actorOf(Props[TransformationFrontend], name = "frontend") + } + runOn(backend2, backend3) { + Cluster(system) join node(backend1).address + system.actorOf(Props[TransformationBackend], name = "backend") + } + + testConductor.enter("all-started") + + runOn(frontend1, frontend2) { + assertServiceOk + } + + testConductor.enter("all-ok") + + } + + } + + def assertServiceOk: Unit = { + val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend") + // eventually the service should be ok, + // backends might not have registered initially + awaitCond { + transformationFrontend ! TransformationJob("hello") + expectMsgPF() { + case unavailble: JobFailed ⇒ false + case TransformationResult(result) ⇒ + result must be("HELLO") + true + } + } + } + +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 2bbf27f3d2..597c7a8b99 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -328,11 +328,20 @@ object AkkaBuild extends Build { ) lazy val clusterSample = Project( - id = "akka-sample-cluster", + id = "akka-sample-cluster-experimental", base = file("akka-samples/akka-sample-cluster"), - dependencies = Seq(cluster), - settings = defaultSettings ++ Seq( publishArtifact in Compile := false ) - ) + dependencies = Seq(cluster, remoteTests % "compile;test->test;multi-jvm->multi-jvm", testkit % "test->test"), + settings = defaultSettings ++ multiJvmSettings ++ Seq( + // disable parallel tests + parallelExecution in Test := false, + extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => + (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq + }, + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, + jvmOptions in MultiJvm := defaultMultiJvmOptions, + publishArtifact in Compile := false + ) + ) configs (MultiJvm) lazy val docs = Project( id = "akka-docs", diff --git a/project/scripts/multi-node-log-replace b/project/scripts/multi-node-log-replace deleted file mode 100755 index 83f1b8a136..0000000000 --- a/project/scripts/multi-node-log-replace +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash -# -# Utility to make log files from multi-node tests easier to analyze. -# Replaces jvm names and host:port with corresponding logical role name. -# - - -# check for an sbt command -type -P sbt &> /dev/null || fail "sbt command not found" - -sbt "project akka-remote-tests" "test:run-main akka.remote.testkit.LogRoleReplace $1 $2" \ No newline at end of file