diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 351b5d0a48..8a847d0b05 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -21,6 +21,7 @@ import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.MemberStatus import akka.routing.FromConfig +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope //#imports //#messages @@ -39,7 +40,10 @@ class StatsService extends Actor { val words = text.split(" ") val replyTo = sender // important to not close over sender val aggregator = context.actorOf(Props(new StatsAggregator(words.size, replyTo))) - words foreach { word ⇒ workerRouter.tell(word, aggregator) } + words foreach { word ⇒ + workerRouter.tell( + ConsistentHashableEnvelope(word, word), aggregator) + } } } @@ -64,9 +68,18 @@ class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor { //#worker class StatsWorker extends Actor { - // FIXME add a cache here to illustrate consistent hashing + var cache = Map.empty[String, Int] def receive = { - case word: String ⇒ sender ! word.length + case word: String ⇒ + val length = cache.get(word) match { + case Some(x) ⇒ x + case None ⇒ + val x = word.length + cache += (word -> x) + x + } + + sender ! length } } //#worker @@ -124,8 +137,7 @@ object StatsSample { val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" akka.actor.deployment { /statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on @@ -153,8 +165,7 @@ object StatsSampleOneMaster { val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" akka.actor.deployment { /statsFacade/statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on @@ -225,10 +236,10 @@ abstract class StatsService2 extends Actor { //#router-lookup-in-code import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterSettings - import akka.routing.RoundRobinRouter + import akka.routing.ConsistentHashingRouter val workerRouter = context.actorOf(Props[StatsWorker].withRouter( - ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( + ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, routeesPath = "/user/statsWorker", allowLocalRoutees = true))), name = "workerRouter2") @@ -240,11 +251,10 @@ abstract class StatsService3 extends Actor { //#router-deploy-in-code import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterSettings - // FIXME use ConsistentHashingRouter instead - import akka.routing.RoundRobinRouter + import akka.routing.ConsistentHashingRouter val workerRouter = context.actorOf(Props[StatsWorker].withRouter( - ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( + ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, maxInstancesPerNode = 3, allowLocalRoutees = false))), name = "workerRouter3") diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index e23504d084..b1d27cd7a3 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -35,8 +35,7 @@ object StatsSampleSingleMasterSpec extends MultiNodeConfig { #//#router-deploy-config akka.actor.deployment { /statsFacade/statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 7398aa025b..9f88597051 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -31,8 +31,7 @@ object StatsSampleSpec extends MultiNodeConfig { #//#router-lookup-config akka.actor.deployment { /statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on