Switch to ConistentHashingRouter in cluster sample

This commit is contained in:
Patrik Nordwall 2012-09-20 12:58:51 +02:00
parent bc34adf624
commit c7b966b4e7
3 changed files with 24 additions and 16 deletions

View file

@ -21,6 +21,7 @@ import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.routing.FromConfig import akka.routing.FromConfig
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
//#imports //#imports
//#messages //#messages
@ -39,7 +40,10 @@ class StatsService extends Actor {
val words = text.split(" ") val words = text.split(" ")
val replyTo = sender // important to not close over sender val replyTo = sender // important to not close over sender
val aggregator = context.actorOf(Props(new StatsAggregator(words.size, replyTo))) val aggregator = context.actorOf(Props(new StatsAggregator(words.size, replyTo)))
words foreach { word workerRouter.tell(word, aggregator) } words foreach { word
workerRouter.tell(
ConsistentHashableEnvelope(word, word), aggregator)
}
} }
} }
@ -64,9 +68,18 @@ class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor {
//#worker //#worker
class StatsWorker extends Actor { class StatsWorker extends Actor {
// FIXME add a cache here to illustrate consistent hashing var cache = Map.empty[String, Int]
def receive = { def receive = {
case word: String sender ! word.length case word: String
val length = cache.get(word) match {
case Some(x) x
case None
val x = word.length
cache += (word -> x)
x
}
sender ! length
} }
} }
//#worker //#worker
@ -124,8 +137,7 @@ object StatsSample {
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
akka.actor.deployment { akka.actor.deployment {
/statsService/workerRouter { /statsService/workerRouter {
# FIXME use consistent hashing instead router = consistent-hashing
router = round-robin
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
enabled = on enabled = on
@ -153,8 +165,7 @@ object StatsSampleOneMaster {
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
akka.actor.deployment { akka.actor.deployment {
/statsFacade/statsService/workerRouter { /statsFacade/statsService/workerRouter {
# FIXME use consistent hashing instead router = consistent-hashing
router = round-robin
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
enabled = on enabled = on
@ -225,10 +236,10 @@ abstract class StatsService2 extends Actor {
//#router-lookup-in-code //#router-lookup-in-code
import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings import akka.cluster.routing.ClusterRouterSettings
import akka.routing.RoundRobinRouter import akka.routing.ConsistentHashingRouter
val workerRouter = context.actorOf(Props[StatsWorker].withRouter( val workerRouter = context.actorOf(Props[StatsWorker].withRouter(
ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
totalInstances = 100, routeesPath = "/user/statsWorker", totalInstances = 100, routeesPath = "/user/statsWorker",
allowLocalRoutees = true))), allowLocalRoutees = true))),
name = "workerRouter2") name = "workerRouter2")
@ -240,11 +251,10 @@ abstract class StatsService3 extends Actor {
//#router-deploy-in-code //#router-deploy-in-code
import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings import akka.cluster.routing.ClusterRouterSettings
// FIXME use ConsistentHashingRouter instead import akka.routing.ConsistentHashingRouter
import akka.routing.RoundRobinRouter
val workerRouter = context.actorOf(Props[StatsWorker].withRouter( val workerRouter = context.actorOf(Props[StatsWorker].withRouter(
ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3, totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false))), allowLocalRoutees = false))),
name = "workerRouter3") name = "workerRouter3")

View file

@ -35,8 +35,7 @@ object StatsSampleSingleMasterSpec extends MultiNodeConfig {
#//#router-deploy-config #//#router-deploy-config
akka.actor.deployment { akka.actor.deployment {
/statsFacade/statsService/workerRouter { /statsFacade/statsService/workerRouter {
# FIXME use consistent hashing instead router = consistent-hashing
router = round-robin
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
enabled = on enabled = on

View file

@ -31,8 +31,7 @@ object StatsSampleSpec extends MultiNodeConfig {
#//#router-lookup-config #//#router-lookup-config
akka.actor.deployment { akka.actor.deployment {
/statsService/workerRouter { /statsService/workerRouter {
# FIXME use consistent hashing instead router = consistent-hashing
router = round-robin
nr-of-instances = 100 nr-of-instances = 100
cluster { cluster {
enabled = on enabled = on