From ad184058777740089ca363e6ed96cfbb3155d5fd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Mar 2014 16:32:54 +0100 Subject: [PATCH] =sam #3843 Use ClusterSingletonProxy in cluster samples --- akka-docs/rst/java/cluster-usage.rst | 8 +- akka-docs/rst/scala/cluster-usage.rst | 8 +- .../sample/cluster/stats/StatsFacade.java | 95 ------------------- .../stats/StatsSampleOneMasterClientMain.java | 2 +- .../stats/StatsSampleOneMasterMain.java | 6 +- .../stats/StatsSampleSingleMasterSpec.scala | 14 +-- .../tutorial/index.html | 7 +- .../sample/cluster/stats/StatsFacade.scala | 48 ---------- .../cluster/stats/StatsSampleOneMaster.scala | 9 +- .../stats/StatsSampleSingleMasterSpec.scala | 10 +- .../tutorial/index.html | 7 +- 11 files changed, 41 insertions(+), 173 deletions(-) delete mode 100644 akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsFacade.java delete mode 100644 akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsFacade.scala diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 287ad1e2c9..08933e27a0 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -519,14 +519,14 @@ in the contrib module. The ``ClusterSingletonManager`` is started on each node. .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java#create-singleton-manager We also need an actor on each node that keeps track of where current single master exists and -delegates jobs to the ``StatsService``. +delegates jobs to the ``StatsService``. That is provided by the ``ClusterSingletonProxy``. -.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsFacade.java#facade +.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java#singleton-proxy -The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single +The ``ClusterSingletonProxy`` receives text from users and delegates to the current ``StatsService``, the single master. It listens to cluster events to lookup the ``StatsService`` on the oldest node. -All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: +All nodes start ``ClusterSingletonProxy`` and the ``ClusterSingletonManager``. The router is now configured like this: .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/resources/stats2.conf#config-router-deploy diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 981c6fb4fe..210d6bcc8e 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -514,14 +514,14 @@ in the contrib module. The ``ClusterSingletonManager`` is started on each node. .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala#create-singleton-manager We also need an actor on each node that keeps track of where current single master exists and -delegates jobs to the ``StatsService``. +delegates jobs to the ``StatsService``. That is provided by the ``ClusterSingletonProxy``. -.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsFacade.scala#facade +.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala#singleton-proxy -The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single +The ``ClusterSingletonProxy`` receives text from users and delegates to the current ``StatsService``, the single master. It listens to cluster events to lookup the ``StatsService`` on the oldest node. -All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: +All nodes start ``ClusterSingletonProxy`` and the ``ClusterSingletonManager``. The router is now configured like this: .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/resources/stats2.conf#config-router-deploy diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsFacade.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsFacade.java deleted file mode 100644 index 24a8403640..0000000000 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsFacade.java +++ /dev/null @@ -1,95 +0,0 @@ -package sample.cluster.stats; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import sample.cluster.stats.StatsMessages.JobFailed; -import sample.cluster.stats.StatsMessages.StatsJob; -import akka.actor.ActorSelection; -import akka.actor.UntypedActor; -import akka.cluster.Cluster; -import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.MemberEvent; -import akka.cluster.ClusterEvent.MemberUp; -import akka.cluster.ClusterEvent.MemberRemoved; -import akka.cluster.Member; -import akka.event.Logging; -import akka.event.LoggingAdapter; - -//#facade -public class StatsFacade extends UntypedActor { - - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - final Cluster cluster = Cluster.get(getContext().system()); - - final Comparator ageComparator = new Comparator() { - public int compare(Member a, Member b) { - if (a.isOlderThan(b)) - return -1; - else if (b.isOlderThan(a)) - return 1; - else - return 0; - } - }; - final SortedSet membersByAge = new TreeSet(ageComparator); - - //subscribe to cluster changes - @Override - public void preStart() { - cluster.subscribe(getSelf(), MemberEvent.class); - } - - //re-subscribe when restart - @Override - public void postStop() { - cluster.unsubscribe(getSelf()); - } - - @Override - public void onReceive(Object message) { - if (message instanceof StatsJob && membersByAge.isEmpty()) { - getSender().tell(new JobFailed("Service unavailable, try again later"), - getSelf()); - - } else if (message instanceof StatsJob) { - currentMaster().tell(message, getSender()); - - } else if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - List members = new ArrayList(); - for (Member m : state.getMembers()) { - if (m.hasRole("compute")) - members.add(m); - } - membersByAge.clear(); - membersByAge.addAll(members); - - } else if (message instanceof MemberUp) { - Member m = ((MemberUp) message).member(); - if (m.hasRole("compute")) - membersByAge.add(m); - - } else if (message instanceof MemberRemoved) { - Member m = ((MemberRemoved) message).member(); - if (m.hasRole("compute")) - membersByAge.remove(m); - - } else if (message instanceof MemberEvent) { - // not interesting - - } else { - unhandled(message); - } - } - - ActorSelection currentMaster() { - return getContext().actorSelection( - membersByAge.first().address() + "/user/singleton/statsService"); - } - -} -//#facade diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterClientMain.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterClientMain.java index a68238ce0b..7506687283 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterClientMain.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterClientMain.java @@ -11,7 +11,7 @@ public class StatsSampleOneMasterClientMain { // note that client is not a compute node, role not defined ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("stats2")); - system.actorOf(Props.create(StatsSampleClient.class, "/user/statsFacade"), + system.actorOf(Props.create(StatsSampleClient.class, "/user/statsServiceProxy"), "client"); } diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java index e549d47704..5119d7fd4b 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java @@ -7,6 +7,7 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.contrib.pattern.ClusterSingletonManager; +import akka.contrib.pattern.ClusterSingletonProxy; public class StatsSampleOneMasterMain { @@ -36,7 +37,10 @@ public class StatsSampleOneMasterMain { PoisonPill.getInstance(), "compute"), "singleton"); //#create-singleton-manager - system.actorOf(Props.create(StatsFacade.class), "statsFacade"); + //#singleton-proxy + system.actorOf(ClusterSingletonProxy.defaultProps("/user/singleton/statsService", + "compute"), "statsServiceProxy"); + //#singleton-proxy } } diff --git a/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index d387dfbcc6..36f5a99e0f 100644 --- a/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -15,6 +15,7 @@ import akka.cluster.MemberStatus import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberUp import akka.contrib.pattern.ClusterSingletonManager +import akka.contrib.pattern.ClusterSingletonProxy import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit.ImplicitSender @@ -35,7 +36,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { akka.cluster.roles = [compute] # don't use sigar for tests, native lib not in path akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector - #//#router-deploy-config + #//#router-deploy-config akka.actor.deployment { /singleton/statsService/workerRouter { router = consistent-hashing-pool @@ -48,7 +49,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { } } } - #//#router-deploy-config + #//#router-deploy-config """)) } @@ -91,18 +92,19 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing terminationMessage = PoisonPill, role = null), name = "singleton") - system.actorOf(Props[StatsFacade], "statsFacade") + system.actorOf(ClusterSingletonProxy.defaultProps("/user/singleton/statsService", + "compute"), "statsServiceProxy"); testConductor.enter("all-up") } - "show usage of the statsFacade" in within(40 seconds) { - val facade = system.actorSelection(RootActorPath(node(third).address) / "user" / "statsFacade") + "show usage of the statsServiceProxy" in within(40 seconds) { + val proxy = system.actorSelection(RootActorPath(node(third).address) / "user" / "statsServiceProxy") // eventually the service should be ok, // service and worker nodes might not be up yet awaitAssert { - facade ! new StatsJob("this is the text that will be analyzed") + proxy ! new StatsJob("this is the text that will be analyzed") expectMsgType[StatsResult](1.second).getMeanWordLength should be(3.875 +- 0.001) } diff --git a/akka-samples/akka-sample-cluster-java/tutorial/index.html b/akka-samples/akka-sample-cluster-java/tutorial/index.html index b7762fe334..d2a88b01fb 100644 --- a/akka-samples/akka-sample-cluster-java/tutorial/index.html +++ b/akka-samples/akka-sample-cluster-java/tutorial/index.html @@ -352,17 +352,16 @@ in the contrib module. The ClusterSingletonManager is started on ea

We also need an actor on each node that keeps track of where current single master exists and -delegates jobs to the StatsService. That is handled by the -StatsFacade.java +delegates jobs to the StatsService. That is provided by the ClusterSingletonProxy.

-The StatsFacade receives text from users and delegates to the current StatsService, the single +The ClusterSingletonProxy receives text from users and delegates to the current StatsService, the single master. It listens to cluster events to lookup the StatsService on the oldest node.

-All nodes start StatsFacade and the ClusterSingletonManager. The router is now configured in +All nodes start ClusterSingletonProxy and the ClusterSingletonManager. The router is now configured in stats2.conf

diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsFacade.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsFacade.scala deleted file mode 100644 index ccd2e99221..0000000000 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsFacade.scala +++ /dev/null @@ -1,48 +0,0 @@ -package sample.cluster.stats - -import scala.collection.immutable -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorSelection -import akka.actor.RootActorPath -import akka.cluster.Cluster -import akka.cluster.ClusterEvent.CurrentClusterState -import akka.cluster.ClusterEvent.MemberEvent -import akka.cluster.ClusterEvent.MemberRemoved -import akka.cluster.ClusterEvent.MemberUp -import akka.cluster.Member - -//#facade -class StatsFacade extends Actor with ActorLogging { - import context.dispatcher - val cluster = Cluster(context.system) - - // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } - var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) - - // subscribe to cluster changes - // re-subscribe when restart - override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) - override def postStop(): Unit = cluster.unsubscribe(self) - - def receive = { - case job: StatsJob if membersByAge.isEmpty => - sender() ! JobFailed("Service unavailable, try again later") - case job: StatsJob => - currentMaster.tell(job, sender()) - case state: CurrentClusterState => - membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { - case m if m.hasRole("compute") => m - } - case MemberUp(m) => if (m.hasRole("compute")) membersByAge += m - case MemberRemoved(m, _) => if (m.hasRole("compute")) membersByAge -= m - case _: MemberEvent => // not interesting - } - - def currentMaster: ActorSelection = - context.actorSelection(RootActorPath(membersByAge.head.address) / - "user" / "singleton" / "statsService") - -} -//#facade \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala index 3fc7f5c8f1..91367604c3 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala @@ -5,6 +5,7 @@ import akka.actor.ActorSystem import akka.actor.PoisonPill import akka.actor.Props import akka.contrib.pattern.ClusterSingletonManager +import akka.contrib.pattern.ClusterSingletonProxy object StatsSampleOneMaster { def main(args: Array[String]): Unit = { @@ -32,7 +33,11 @@ object StatsSampleOneMaster { terminationMessage = PoisonPill, role = Some("compute")), name = "singleton") //#create-singleton-manager - system.actorOf(Props[StatsFacade], name = "statsFacade") + + //#singleton-proxy + system.actorOf(ClusterSingletonProxy.props(singletonPath = "/user/singleton/statsService", + role = Some("compute")), name = "statsServiceProxy") + //#singleton-proxy } } } @@ -41,7 +46,7 @@ object StatsSampleOneMasterClient { def main(args: Array[String]): Unit = { // note that client is not a compute node, role not defined val system = ActorSystem("ClusterSystem") - system.actorOf(Props(classOf[StatsSampleClient], "/user/statsFacade"), "client") + system.actorOf(Props(classOf[StatsSampleClient], "/user/statsServiceProxy"), "client") } } diff --git a/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index e4a3062c32..01f517cc23 100644 --- a/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -10,6 +10,7 @@ import akka.actor.PoisonPill import akka.actor.Props import akka.actor.RootActorPath import akka.contrib.pattern.ClusterSingletonManager +import akka.contrib.pattern.ClusterSingletonProxy import akka.cluster.Cluster import akka.cluster.Member import akka.cluster.MemberStatus @@ -88,18 +89,19 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing singletonProps = Props[StatsService], singletonName = "statsService", terminationMessage = PoisonPill, role = Some("compute")), name = "singleton") - system.actorOf(Props[StatsFacade], "statsFacade") + system.actorOf(ClusterSingletonProxy.props(singletonPath = "/user/singleton/statsService", + role = Some("compute")), name = "statsServiceProxy") testConductor.enter("all-up") } - "show usage of the statsFacade" in within(40 seconds) { - val facade = system.actorSelection(RootActorPath(node(third).address) / "user" / "statsFacade") + "show usage of the statsServiceProxy" in within(40 seconds) { + val proxy = system.actorSelection(RootActorPath(node(third).address) / "user" / "statsServiceProxy") // eventually the service should be ok, // service and worker nodes might not be up yet awaitAssert { - facade ! StatsJob("this is the text that will be analyzed") + proxy ! StatsJob("this is the text that will be analyzed") expectMsgType[StatsResult](1.second).meanWordLength should be( 3.875 +- 0.001) } diff --git a/akka-samples/akka-sample-cluster-scala/tutorial/index.html b/akka-samples/akka-sample-cluster-scala/tutorial/index.html index 4ea72e1a8e..6256a1f170 100644 --- a/akka-samples/akka-sample-cluster-scala/tutorial/index.html +++ b/akka-samples/akka-sample-cluster-scala/tutorial/index.html @@ -351,17 +351,16 @@ in the contrib module. The ClusterSingletonManager is started on ea

We also need an actor on each node that keeps track of where current single master exists and -delegates jobs to the StatsService. That is handled by the -StatsFacade.scala +delegates jobs to the StatsService. That is provided by the ClusterSingletonProxy.

-The StatsFacade receives text from users and delegates to the current StatsService, the single +The ClusterSingletonProxy receives text from users and delegates to the current StatsService, the single master. It listens to cluster events to lookup the StatsService on the oldest node.

-All nodes start StatsFacade and the ClusterSingletonManager. The router is now configured in +All nodes start ClusterSingletonProxy and the ClusterSingletonManager. The router is now configured in stats2.conf