=sam #3843 Use ClusterSingletonProxy in cluster samples

This commit is contained in:
Patrik Nordwall 2014-03-14 16:32:54 +01:00
parent f457e0a30c
commit ad18405877
11 changed files with 41 additions and 173 deletions

View file

@ -1,95 +0,0 @@
package sample.cluster.stats;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import sample.cluster.stats.StatsMessages.JobFailed;
import sample.cluster.stats.StatsMessages.StatsJob;
import akka.actor.ActorSelection;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.Member;
import akka.event.Logging;
import akka.event.LoggingAdapter;
//#facade
public class StatsFacade extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final Cluster cluster = Cluster.get(getContext().system());
final Comparator<Member> ageComparator = new Comparator<Member>() {
public int compare(Member a, Member b) {
if (a.isOlderThan(b))
return -1;
else if (b.isOlderThan(a))
return 1;
else
return 0;
}
};
final SortedSet<Member> membersByAge = new TreeSet<Member>(ageComparator);
//subscribe to cluster changes
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberEvent.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof StatsJob && membersByAge.isEmpty()) {
getSender().tell(new JobFailed("Service unavailable, try again later"),
getSelf());
} else if (message instanceof StatsJob) {
currentMaster().tell(message, getSender());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
List<Member> members = new ArrayList<Member>();
for (Member m : state.getMembers()) {
if (m.hasRole("compute"))
members.add(m);
}
membersByAge.clear();
membersByAge.addAll(members);
} else if (message instanceof MemberUp) {
Member m = ((MemberUp) message).member();
if (m.hasRole("compute"))
membersByAge.add(m);
} else if (message instanceof MemberRemoved) {
Member m = ((MemberRemoved) message).member();
if (m.hasRole("compute"))
membersByAge.remove(m);
} else if (message instanceof MemberEvent) {
// not interesting
} else {
unhandled(message);
}
}
ActorSelection currentMaster() {
return getContext().actorSelection(
membersByAge.first().address() + "/user/singleton/statsService");
}
}
//#facade

View file

@ -11,7 +11,7 @@ public class StatsSampleOneMasterClientMain {
// note that client is not a compute node, role not defined
ActorSystem system = ActorSystem.create("ClusterSystem",
ConfigFactory.load("stats2"));
system.actorOf(Props.create(StatsSampleClient.class, "/user/statsFacade"),
system.actorOf(Props.create(StatsSampleClient.class, "/user/statsServiceProxy"),
"client");
}

View file

@ -7,6 +7,7 @@ import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.ClusterSingletonProxy;
public class StatsSampleOneMasterMain {
@ -36,7 +37,10 @@ public class StatsSampleOneMasterMain {
PoisonPill.getInstance(), "compute"), "singleton");
//#create-singleton-manager
system.actorOf(Props.create(StatsFacade.class), "statsFacade");
//#singleton-proxy
system.actorOf(ClusterSingletonProxy.defaultProps("/user/singleton/statsService",
"compute"), "statsServiceProxy");
//#singleton-proxy
}
}

View file

@ -15,6 +15,7 @@ import akka.cluster.MemberStatus
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.contrib.pattern.ClusterSingletonManager
import akka.contrib.pattern.ClusterSingletonProxy
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
@ -35,7 +36,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
akka.cluster.roles = [compute]
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
#//#router-deploy-config
#//#router-deploy-config
akka.actor.deployment {
/singleton/statsService/workerRouter {
router = consistent-hashing-pool
@ -48,7 +49,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
}
}
}
#//#router-deploy-config
#//#router-deploy-config
"""))
}
@ -91,18 +92,19 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
terminationMessage = PoisonPill,
role = null), name = "singleton")
system.actorOf(Props[StatsFacade], "statsFacade")
system.actorOf(ClusterSingletonProxy.defaultProps("/user/singleton/statsService",
"compute"), "statsServiceProxy");
testConductor.enter("all-up")
}
"show usage of the statsFacade" in within(40 seconds) {
val facade = system.actorSelection(RootActorPath(node(third).address) / "user" / "statsFacade")
"show usage of the statsServiceProxy" in within(40 seconds) {
val proxy = system.actorSelection(RootActorPath(node(third).address) / "user" / "statsServiceProxy")
// eventually the service should be ok,
// service and worker nodes might not be up yet
awaitAssert {
facade ! new StatsJob("this is the text that will be analyzed")
proxy ! new StatsJob("this is the text that will be analyzed")
expectMsgType[StatsResult](1.second).getMeanWordLength should be(3.875 +- 0.001)
}