=doc #3689 Make activator templates for cluster samples
This commit is contained in:
parent
b82698a354
commit
37f8f2831b
135 changed files with 2650 additions and 1461 deletions
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe <http://typesafe.com/>
|
||||
*/
|
||||
package sample.cluster.factorial;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.routing.AdaptiveLoadBalancingGroup;
|
||||
import akka.cluster.routing.AdaptiveLoadBalancingPool;
|
||||
import akka.cluster.routing.ClusterRouterGroup;
|
||||
import akka.cluster.routing.ClusterRouterGroupSettings;
|
||||
import akka.cluster.routing.ClusterRouterPool;
|
||||
import akka.cluster.routing.ClusterRouterPoolSettings;
|
||||
import akka.cluster.routing.HeapMetricsSelector;
|
||||
import akka.cluster.routing.SystemLoadAverageMetricsSelector;
|
||||
|
||||
//not used, only for documentation
|
||||
abstract class FactorialFrontend2 extends UntypedActor {
|
||||
//#router-lookup-in-code
|
||||
int totalInstances = 100;
|
||||
Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", "");
|
||||
boolean allowLocalRoutees = true;
|
||||
String useRole = "backend";
|
||||
ActorRef backend = getContext().actorOf(
|
||||
new ClusterRouterGroup(new AdaptiveLoadBalancingGroup(
|
||||
HeapMetricsSelector.getInstance(), Collections.<String> emptyList()),
|
||||
new ClusterRouterGroupSettings(totalInstances, routeesPaths,
|
||||
allowLocalRoutees, useRole)).props(), "factorialBackendRouter2");
|
||||
//#router-lookup-in-code
|
||||
}
|
||||
|
||||
//not used, only for documentation
|
||||
abstract class FactorialFrontend3 extends UntypedActor {
|
||||
//#router-deploy-in-code
|
||||
int totalInstances = 100;
|
||||
int maxInstancesPerNode = 3;
|
||||
boolean allowLocalRoutees = false;
|
||||
String useRole = "backend";
|
||||
ActorRef backend = getContext().actorOf(
|
||||
new ClusterRouterPool(new AdaptiveLoadBalancingPool(
|
||||
SystemLoadAverageMetricsSelector.getInstance(), 0),
|
||||
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode,
|
||||
allowLocalRoutees, useRole)).props(Props
|
||||
.create(FactorialBackend.class)), "factorialBackendRouter3");
|
||||
//#router-deploy-in-code
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
public class FactorialApp {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// starting 3 backend nodes and 1 frontend node
|
||||
FactorialBackendMain.main(new String[] { "2551" });
|
||||
FactorialBackendMain.main(new String[] { "2552" });
|
||||
FactorialBackendMain.main(new String[0]);
|
||||
FactorialFrontendMain.main(new String[0]);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.concurrent.Callable;
|
||||
import scala.concurrent.Future;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.dispatch.Mapper;
|
||||
import static akka.dispatch.Futures.future;
|
||||
import static akka.pattern.Patterns.pipe;
|
||||
|
||||
//#backend
|
||||
public class FactorialBackend extends UntypedActor {
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof Integer) {
|
||||
final Integer n = (Integer) message;
|
||||
Future<BigInteger> f = future(new Callable<BigInteger>() {
|
||||
public BigInteger call() {
|
||||
return factorial(n);
|
||||
}
|
||||
}, getContext().dispatcher());
|
||||
|
||||
Future<FactorialResult> result = f.map(
|
||||
new Mapper<BigInteger, FactorialResult>() {
|
||||
public FactorialResult apply(BigInteger factorial) {
|
||||
return new FactorialResult(n, factorial);
|
||||
}
|
||||
}, getContext().dispatcher());
|
||||
|
||||
pipe(result, getContext().dispatcher()).to(getSender());
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
BigInteger factorial(int n) {
|
||||
BigInteger acc = BigInteger.ONE;
|
||||
for (int i = 1; i <= n; ++i) {
|
||||
acc = acc.multiply(BigInteger.valueOf(i));
|
||||
}
|
||||
return acc;
|
||||
}
|
||||
}
|
||||
//#backend
|
||||
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
public class FactorialBackendMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// Override the configuration of the port when specified as program argument
|
||||
final String port = args.length > 0 ? args[0] : "0";
|
||||
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
|
||||
withFallback(ConfigFactory.load("factorial"));
|
||||
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
|
||||
system.actorOf(Props.create(FactorialBackend.class), "factorialBackend");
|
||||
|
||||
system.actorOf(Props.create(MetricsListener.class), "metricsListener");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import akka.routing.FromConfig;
|
||||
|
||||
//#frontend
|
||||
public class FactorialFrontend extends UntypedActor {
|
||||
final int upToN;
|
||||
final boolean repeat;
|
||||
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
|
||||
ActorRef backend = getContext().actorOf(FromConfig.getInstance().props(),
|
||||
"factorialBackendRouter");
|
||||
|
||||
public FactorialFrontend(int upToN, boolean repeat) {
|
||||
this.upToN = upToN;
|
||||
this.repeat = repeat;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
sendJobs();
|
||||
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof FactorialResult) {
|
||||
FactorialResult result = (FactorialResult) message;
|
||||
if (result.n == upToN) {
|
||||
log.debug("{}! = {}", result.n, result.factorial);
|
||||
if (repeat)
|
||||
sendJobs();
|
||||
else
|
||||
getContext().stop(getSelf());
|
||||
}
|
||||
|
||||
} else if (message instanceof ReceiveTimeout) {
|
||||
log.info("Timeout");
|
||||
sendJobs();
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
void sendJobs() {
|
||||
log.info("Starting batch of factorials up to [{}]", upToN);
|
||||
for (int n = 1; n <= upToN; n++) {
|
||||
backend.tell(n, getSelf());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#frontend
|
||||
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.cluster.Cluster;
|
||||
|
||||
public class FactorialFrontendMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
final int upToN = 200;
|
||||
|
||||
final Config config = ConfigFactory.parseString(
|
||||
"akka.cluster.roles = [frontend]").withFallback(
|
||||
ConfigFactory.load("factorial"));
|
||||
|
||||
final ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
system.log().info(
|
||||
"Factorials will start when 2 backend members in the cluster.");
|
||||
//#registerOnUp
|
||||
Cluster.get(system).registerOnMemberUp(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
system.actorOf(Props.create(FactorialFrontend.class, upToN, true),
|
||||
"factorialFrontend");
|
||||
}
|
||||
});
|
||||
//#registerOnUp
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.io.Serializable;
|
||||
|
||||
public class FactorialResult implements Serializable {
|
||||
public final int n;
|
||||
public final BigInteger factorial;
|
||||
|
||||
FactorialResult(int n, BigInteger factorial) {
|
||||
this.n = n;
|
||||
this.factorial = factorial;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package sample.cluster.factorial;
|
||||
|
||||
//#metrics-listener
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.cluster.ClusterEvent.ClusterMetricsChanged;
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState;
|
||||
import akka.cluster.NodeMetrics;
|
||||
import akka.cluster.StandardMetrics;
|
||||
import akka.cluster.StandardMetrics.HeapMemory;
|
||||
import akka.cluster.StandardMetrics.Cpu;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class MetricsListener extends UntypedActor {
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
|
||||
Cluster cluster = Cluster.get(getContext().system());
|
||||
|
||||
//subscribe to ClusterMetricsChanged
|
||||
@Override
|
||||
public void preStart() {
|
||||
cluster.subscribe(getSelf(), ClusterMetricsChanged.class);
|
||||
}
|
||||
|
||||
//re-subscribe when restart
|
||||
@Override
|
||||
public void postStop() {
|
||||
cluster.unsubscribe(getSelf());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof ClusterMetricsChanged) {
|
||||
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
|
||||
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
|
||||
if (nodeMetrics.address().equals(cluster.selfAddress())) {
|
||||
logHeap(nodeMetrics);
|
||||
logCpu(nodeMetrics);
|
||||
}
|
||||
}
|
||||
|
||||
} else if (message instanceof CurrentClusterState) {
|
||||
// ignore
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
void logHeap(NodeMetrics nodeMetrics) {
|
||||
HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
|
||||
if (heap != null) {
|
||||
log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
|
||||
}
|
||||
}
|
||||
|
||||
void logCpu(NodeMetrics nodeMetrics) {
|
||||
Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
|
||||
if (cpu != null && cpu.systemLoadAverage().isDefined()) {
|
||||
log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
|
||||
cpu.processors());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#metrics-listener
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
package sample.cluster.simple;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
public class SimpleClusterApp {
|
||||
|
||||
public static void main(String[] args) {
|
||||
if (args.length == 0)
|
||||
startup(new String[] { "2551", "2552", "0" });
|
||||
else
|
||||
startup(args);
|
||||
}
|
||||
|
||||
public static void startup(String[] ports) {
|
||||
for (String port : ports) {
|
||||
// Override the configuration of the port
|
||||
Config config = ConfigFactory.parseString(
|
||||
"akka.remote.netty.tcp.port=" + port).withFallback(
|
||||
ConfigFactory.load());
|
||||
|
||||
// Create an Akka system
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
|
||||
// Create an actor that handles cluster domain events
|
||||
system.actorOf(Props.create(SimpleClusterListener.class),
|
||||
"clusterListener");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
package sample.cluster.simple;
|
||||
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.cluster.ClusterEvent.ClusterDomainEvent;
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState;
|
||||
import akka.cluster.ClusterEvent.MemberUp;
|
||||
import akka.cluster.ClusterEvent.MemberRemoved;
|
||||
import akka.cluster.ClusterEvent.UnreachableMember;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
public class SimpleClusterListener extends UntypedActor {
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
Cluster cluster = Cluster.get(getContext().system());
|
||||
|
||||
//subscribe to cluster changes, MemberUp
|
||||
@Override
|
||||
public void preStart() {
|
||||
cluster.subscribe(getSelf(), ClusterDomainEvent.class);
|
||||
}
|
||||
|
||||
//re-subscribe when restart
|
||||
@Override
|
||||
public void postStop() {
|
||||
cluster.unsubscribe(getSelf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof CurrentClusterState) {
|
||||
CurrentClusterState state = (CurrentClusterState) message;
|
||||
log.info("Current members: {}", state.members());
|
||||
|
||||
} else if (message instanceof MemberUp) {
|
||||
MemberUp mUp = (MemberUp) message;
|
||||
log.info("Member is Up: {}", mUp.member());
|
||||
|
||||
} else if (message instanceof UnreachableMember) {
|
||||
UnreachableMember mUnreachable = (UnreachableMember) message;
|
||||
log.info("Member detected as unreachable: {}", mUnreachable.member());
|
||||
|
||||
} else if (message instanceof MemberRemoved) {
|
||||
MemberRemoved mRemoved = (MemberRemoved) message;
|
||||
log.info("Member is Removed: {}", mRemoved.member());
|
||||
|
||||
} else if (message instanceof ClusterDomainEvent) {
|
||||
// ignore
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.routing.ClusterRouterGroup;
|
||||
import akka.cluster.routing.ClusterRouterGroupSettings;
|
||||
import akka.cluster.routing.ClusterRouterPool;
|
||||
import akka.cluster.routing.ClusterRouterPoolSettings;
|
||||
import akka.routing.ConsistentHashingGroup;
|
||||
import akka.routing.ConsistentHashingPool;
|
||||
|
||||
//not used, only for documentation
|
||||
abstract class StatsService2 extends UntypedActor {
|
||||
//#router-lookup-in-code
|
||||
int totalInstances = 100;
|
||||
Iterable<String> routeesPaths = Collections
|
||||
.singletonList("/user/statsWorker");
|
||||
boolean allowLocalRoutees = true;
|
||||
String useRole = "compute";
|
||||
ActorRef workerRouter = getContext().actorOf(
|
||||
new ClusterRouterGroup(new ConsistentHashingGroup(routeesPaths),
|
||||
new ClusterRouterGroupSettings(totalInstances, routeesPaths,
|
||||
allowLocalRoutees, useRole)).props(), "workerRouter2");
|
||||
//#router-lookup-in-code
|
||||
}
|
||||
|
||||
//not used, only for documentation
|
||||
abstract class StatsService3 extends UntypedActor {
|
||||
//#router-deploy-in-code
|
||||
int totalInstances = 100;
|
||||
int maxInstancesPerNode = 3;
|
||||
boolean allowLocalRoutees = false;
|
||||
String useRole = "compute";
|
||||
ActorRef workerRouter = getContext().actorOf(
|
||||
new ClusterRouterPool(new ConsistentHashingPool(0),
|
||||
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode,
|
||||
allowLocalRoutees, useRole)).props(Props
|
||||
.create(StatsWorker.class)), "workerRouter3");
|
||||
//#router-deploy-in-code
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import sample.cluster.stats.StatsMessages.JobFailed;
|
||||
import sample.cluster.stats.StatsMessages.StatsResult;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
import akka.actor.UntypedActor;
|
||||
|
||||
//#aggregator
|
||||
public class StatsAggregator extends UntypedActor {
|
||||
|
||||
final int expectedResults;
|
||||
final ActorRef replyTo;
|
||||
final List<Integer> results = new ArrayList<Integer>();
|
||||
|
||||
public StatsAggregator(int expectedResults, ActorRef replyTo) {
|
||||
this.expectedResults = expectedResults;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
getContext().setReceiveTimeout(Duration.create(3, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof Integer) {
|
||||
Integer wordCount = (Integer) message;
|
||||
results.add(wordCount);
|
||||
if (results.size() == expectedResults) {
|
||||
int sum = 0;
|
||||
for (int c : results)
|
||||
sum += c;
|
||||
double meanWordLength = ((double) sum) / results.size();
|
||||
replyTo.tell(new StatsResult(meanWordLength), getSelf());
|
||||
getContext().stop(getSelf());
|
||||
}
|
||||
|
||||
} else if (message == ReceiveTimeout.getInstance()) {
|
||||
replyTo.tell(new JobFailed("Service unavailable, try again later"),
|
||||
getSelf());
|
||||
getContext().stop(getSelf());
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#aggregator
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import sample.cluster.stats.StatsMessages.JobFailed;
|
||||
import sample.cluster.stats.StatsMessages.StatsJob;
|
||||
import akka.actor.ActorSelection;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState;
|
||||
import akka.cluster.ClusterEvent.MemberEvent;
|
||||
import akka.cluster.ClusterEvent.MemberUp;
|
||||
import akka.cluster.ClusterEvent.MemberRemoved;
|
||||
import akka.cluster.Member;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
|
||||
//#facade
|
||||
public class StatsFacade extends UntypedActor {
|
||||
|
||||
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
final Cluster cluster = Cluster.get(getContext().system());
|
||||
|
||||
final Comparator<Member> ageComparator = new Comparator<Member>() {
|
||||
public int compare(Member a, Member b) {
|
||||
if (a.isOlderThan(b))
|
||||
return -1;
|
||||
else if (b.isOlderThan(a))
|
||||
return 1;
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
final SortedSet<Member> membersByAge = new TreeSet<Member>(ageComparator);
|
||||
|
||||
//subscribe to cluster changes
|
||||
@Override
|
||||
public void preStart() {
|
||||
cluster.subscribe(getSelf(), MemberEvent.class);
|
||||
}
|
||||
|
||||
//re-subscribe when restart
|
||||
@Override
|
||||
public void postStop() {
|
||||
cluster.unsubscribe(getSelf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof StatsJob && membersByAge.isEmpty()) {
|
||||
getSender().tell(new JobFailed("Service unavailable, try again later"),
|
||||
getSelf());
|
||||
|
||||
} else if (message instanceof StatsJob) {
|
||||
currentMaster().tell(message, getSender());
|
||||
|
||||
} else if (message instanceof CurrentClusterState) {
|
||||
CurrentClusterState state = (CurrentClusterState) message;
|
||||
List<Member> members = new ArrayList<Member>();
|
||||
for (Member m : state.getMembers()) {
|
||||
if (m.hasRole("compute"))
|
||||
members.add(m);
|
||||
}
|
||||
membersByAge.clear();
|
||||
membersByAge.addAll(members);
|
||||
|
||||
} else if (message instanceof MemberUp) {
|
||||
Member m = ((MemberUp) message).member();
|
||||
if (m.hasRole("compute"))
|
||||
membersByAge.add(m);
|
||||
|
||||
} else if (message instanceof MemberRemoved) {
|
||||
Member m = ((MemberRemoved) message).member();
|
||||
if (m.hasRole("compute"))
|
||||
membersByAge.remove(m);
|
||||
|
||||
} else if (message instanceof MemberEvent) {
|
||||
// not interesting
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
ActorSelection currentMaster() {
|
||||
return getContext().actorSelection(
|
||||
membersByAge.first().address() + "/user/singleton/statsService");
|
||||
}
|
||||
|
||||
}
|
||||
//#facade
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
//#messages
|
||||
public interface StatsMessages {
|
||||
|
||||
public static class StatsJob implements Serializable {
|
||||
private final String text;
|
||||
|
||||
public StatsJob(String text) {
|
||||
this.text = text;
|
||||
}
|
||||
|
||||
public String getText() {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
|
||||
public static class StatsResult implements Serializable {
|
||||
private final double meanWordLength;
|
||||
|
||||
public StatsResult(double meanWordLength) {
|
||||
this.meanWordLength = meanWordLength;
|
||||
}
|
||||
|
||||
public double getMeanWordLength() {
|
||||
return meanWordLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "meanWordLength: " + meanWordLength;
|
||||
}
|
||||
}
|
||||
|
||||
public static class JobFailed implements Serializable {
|
||||
private final String reason;
|
||||
|
||||
public JobFailed(String reason) {
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
public String getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JobFailed(" + reason + ")";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#messages
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import sample.cluster.stats.StatsMessages.JobFailed;
|
||||
import sample.cluster.stats.StatsMessages.StatsJob;
|
||||
import sample.cluster.stats.StatsMessages.StatsResult;
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import akka.actor.ActorSelection;
|
||||
import akka.actor.Address;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState;
|
||||
import akka.cluster.ClusterEvent.MemberEvent;
|
||||
import akka.cluster.ClusterEvent.MemberUp;
|
||||
import akka.cluster.Member;
|
||||
import akka.cluster.MemberStatus;
|
||||
|
||||
public class StatsSampleClient extends UntypedActor {
|
||||
|
||||
final String servicePath;
|
||||
final Cancellable tickTask;
|
||||
final Set<Address> nodes = new HashSet<Address>();
|
||||
|
||||
Cluster cluster = Cluster.get(getContext().system());
|
||||
|
||||
public StatsSampleClient(String servicePath) {
|
||||
this.servicePath = servicePath;
|
||||
FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
|
||||
tickTask = getContext()
|
||||
.system()
|
||||
.scheduler()
|
||||
.schedule(interval, interval, getSelf(), "tick",
|
||||
getContext().dispatcher(), null);
|
||||
}
|
||||
|
||||
//subscribe to cluster changes, MemberEvent
|
||||
@Override
|
||||
public void preStart() {
|
||||
cluster.subscribe(getSelf(), MemberEvent.class);
|
||||
}
|
||||
|
||||
//re-subscribe when restart
|
||||
@Override
|
||||
public void postStop() {
|
||||
cluster.unsubscribe(getSelf());
|
||||
tickTask.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message.equals("tick") && !nodes.isEmpty()) {
|
||||
// just pick any one
|
||||
List<Address> nodesList = new ArrayList<Address>(nodes);
|
||||
Address address = nodesList.get(ThreadLocalRandom.current().nextInt(
|
||||
nodesList.size()));
|
||||
ActorSelection service = getContext().actorSelection(address + servicePath);
|
||||
service.tell(new StatsJob("this is the text that will be analyzed"),
|
||||
getSelf());
|
||||
|
||||
} else if (message instanceof StatsResult) {
|
||||
StatsResult result = (StatsResult) message;
|
||||
System.out.println(result);
|
||||
|
||||
} else if (message instanceof JobFailed) {
|
||||
JobFailed failed = (JobFailed) message;
|
||||
System.out.println(failed);
|
||||
|
||||
} else if (message instanceof CurrentClusterState) {
|
||||
CurrentClusterState state = (CurrentClusterState) message;
|
||||
nodes.clear();
|
||||
for (Member member : state.getMembers()) {
|
||||
if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) {
|
||||
nodes.add(member.address());
|
||||
}
|
||||
}
|
||||
|
||||
} else if (message instanceof MemberUp) {
|
||||
MemberUp mUp = (MemberUp) message;
|
||||
if (mUp.member().hasRole("compute"))
|
||||
nodes.add(mUp.member().address());
|
||||
|
||||
} else if (message instanceof MemberEvent) {
|
||||
MemberEvent other = (MemberEvent) message;
|
||||
nodes.remove(other.member().address());
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
public class StatsSampleClientMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// note that client is not a compute node, role not defined
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem",
|
||||
ConfigFactory.load("stats1"));
|
||||
system.actorOf(Props.create(StatsSampleClient.class, "/user/statsService"),
|
||||
"client");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
public class StatsSampleMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
if (args.length == 0) {
|
||||
startup(new String[] { "2551", "2552", "0" });
|
||||
StatsSampleClientMain.main(new String[0]);
|
||||
} else {
|
||||
startup(args);
|
||||
}
|
||||
}
|
||||
|
||||
public static void startup(String[] ports) {
|
||||
for (String port : ports) {
|
||||
// Override the configuration of the port
|
||||
Config config = ConfigFactory
|
||||
.parseString("akka.remote.netty.tcp.port=" + port)
|
||||
.withFallback(
|
||||
ConfigFactory.parseString("akka.cluster.roles = [compute]"))
|
||||
.withFallback(ConfigFactory.load("stats1"));
|
||||
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
|
||||
system.actorOf(Props.create(StatsWorker.class), "statsWorker");
|
||||
system.actorOf(Props.create(StatsService.class), "statsService");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
public class StatsSampleOneMasterClientMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// note that client is not a compute node, role not defined
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem",
|
||||
ConfigFactory.load("stats2"));
|
||||
system.actorOf(Props.create(StatsSampleClient.class, "/user/statsFacade"),
|
||||
"client");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.actor.Props;
|
||||
import akka.contrib.pattern.ClusterSingletonManager;
|
||||
|
||||
public class StatsSampleOneMasterMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
if (args.length == 0) {
|
||||
startup(new String[] { "2551", "2552", "0" });
|
||||
StatsSampleOneMasterClientMain.main(new String[0]);
|
||||
} else {
|
||||
startup(args);
|
||||
}
|
||||
}
|
||||
|
||||
public static void startup(String[] ports) {
|
||||
for (String port : ports) {
|
||||
// Override the configuration of the port
|
||||
Config config = ConfigFactory
|
||||
.parseString("akka.remote.netty.tcp.port=" + port)
|
||||
.withFallback(
|
||||
ConfigFactory.parseString("akka.cluster.roles = [compute]"))
|
||||
.withFallback(ConfigFactory.load("stats2"));
|
||||
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
|
||||
//#create-singleton-manager
|
||||
system.actorOf(ClusterSingletonManager.defaultProps(
|
||||
Props.create(StatsService.class), "statsService",
|
||||
PoisonPill.getInstance(), "compute"), "singleton");
|
||||
//#create-singleton-manager
|
||||
|
||||
system.actorOf(Props.create(StatsFacade.class), "statsFacade");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import sample.cluster.stats.StatsMessages.StatsJob;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
|
||||
import akka.routing.FromConfig;
|
||||
|
||||
//#service
|
||||
public class StatsService extends UntypedActor {
|
||||
|
||||
// This router is used both with lookup and deploy of routees. If you
|
||||
// have a router with only lookup of routees you can use Props.empty()
|
||||
// instead of Props.create(StatsWorker.class).
|
||||
ActorRef workerRouter = getContext().actorOf(
|
||||
FromConfig.getInstance().props(Props.create(StatsWorker.class)),
|
||||
"workerRouter");
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof StatsJob) {
|
||||
StatsJob job = (StatsJob) message;
|
||||
if (job.getText().equals("")) {
|
||||
unhandled(message);
|
||||
} else {
|
||||
final String[] words = job.getText().split(" ");
|
||||
final ActorRef replyTo = getSender();
|
||||
|
||||
// create actor that collects replies from workers
|
||||
ActorRef aggregator = getContext().actorOf(
|
||||
Props.create(StatsAggregator.class, words.length, replyTo));
|
||||
|
||||
// send each word to a worker
|
||||
for (String word : words) {
|
||||
workerRouter.tell(new ConsistentHashableEnvelope(word, word),
|
||||
aggregator);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#service
|
||||
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
package sample.cluster.stats;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import akka.actor.UntypedActor;
|
||||
|
||||
//#worker
|
||||
public class StatsWorker extends UntypedActor {
|
||||
|
||||
Map<String, Integer> cache = new HashMap<String, Integer>();
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof String) {
|
||||
String word = (String) message;
|
||||
Integer length = cache.get(word);
|
||||
if (length == null) {
|
||||
length = word.length();
|
||||
cache.put(word, length);
|
||||
}
|
||||
getSender().tell(length, getSelf());
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#worker
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package sample.cluster.transformation;
|
||||
|
||||
public class TransformationApp {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// starting 2 frontend nodes and 3 backend nodes
|
||||
TransformationBackendMain.main(new String[] { "2551" });
|
||||
TransformationBackendMain.main(new String[] { "2552" });
|
||||
TransformationBackendMain.main(new String[0]);
|
||||
TransformationFrontendMain.main(new String[0]);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
package sample.cluster.transformation;
|
||||
|
||||
import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;
|
||||
import sample.cluster.transformation.TransformationMessages.TransformationJob;
|
||||
import sample.cluster.transformation.TransformationMessages.TransformationResult;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState;
|
||||
import akka.cluster.ClusterEvent.MemberUp;
|
||||
import akka.cluster.Member;
|
||||
import akka.cluster.MemberStatus;
|
||||
|
||||
//#backend
|
||||
public class TransformationBackend extends UntypedActor {
|
||||
|
||||
Cluster cluster = Cluster.get(getContext().system());
|
||||
|
||||
//subscribe to cluster changes, MemberUp
|
||||
@Override
|
||||
public void preStart() {
|
||||
cluster.subscribe(getSelf(), MemberUp.class);
|
||||
}
|
||||
|
||||
//re-subscribe when restart
|
||||
@Override
|
||||
public void postStop() {
|
||||
cluster.unsubscribe(getSelf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof TransformationJob) {
|
||||
TransformationJob job = (TransformationJob) message;
|
||||
getSender().tell(new TransformationResult(job.getText().toUpperCase()),
|
||||
getSelf());
|
||||
|
||||
} else if (message instanceof CurrentClusterState) {
|
||||
CurrentClusterState state = (CurrentClusterState) message;
|
||||
for (Member member : state.getMembers()) {
|
||||
if (member.status().equals(MemberStatus.up())) {
|
||||
register(member);
|
||||
}
|
||||
}
|
||||
|
||||
} else if (message instanceof MemberUp) {
|
||||
MemberUp mUp = (MemberUp) message;
|
||||
register(mUp.member());
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
void register(Member member) {
|
||||
if (member.hasRole("frontend"))
|
||||
getContext().actorSelection(member.address() + "/user/frontend").tell(
|
||||
BACKEND_REGISTRATION, getSelf());
|
||||
}
|
||||
}
|
||||
//#backend
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package sample.cluster.transformation;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
public class TransformationBackendMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// Override the configuration of the port when specified as program argument
|
||||
final String port = args.length > 0 ? args[0] : "0";
|
||||
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
|
||||
withFallback(ConfigFactory.load());
|
||||
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
|
||||
system.actorOf(Props.create(TransformationBackend.class), "backend");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
package sample.cluster.transformation;
|
||||
|
||||
import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import sample.cluster.transformation.TransformationMessages.JobFailed;
|
||||
import sample.cluster.transformation.TransformationMessages.TransformationJob;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Terminated;
|
||||
import akka.actor.UntypedActor;
|
||||
|
||||
//#frontend
|
||||
public class TransformationFrontend extends UntypedActor {
|
||||
|
||||
List<ActorRef> backends = new ArrayList<ActorRef>();
|
||||
int jobCounter = 0;
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if ((message instanceof TransformationJob) && backends.isEmpty()) {
|
||||
TransformationJob job = (TransformationJob) message;
|
||||
getSender().tell(
|
||||
new JobFailed("Service unavailable, try again later", job),
|
||||
getSender());
|
||||
|
||||
} else if (message instanceof TransformationJob) {
|
||||
TransformationJob job = (TransformationJob) message;
|
||||
jobCounter++;
|
||||
backends.get(jobCounter % backends.size())
|
||||
.forward(job, getContext());
|
||||
|
||||
} else if (message.equals(BACKEND_REGISTRATION)) {
|
||||
getContext().watch(getSender());
|
||||
backends.add(getSender());
|
||||
|
||||
} else if (message instanceof Terminated) {
|
||||
Terminated terminated = (Terminated) message;
|
||||
backends.remove(terminated.getActor());
|
||||
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#frontend
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
package sample.cluster.transformation;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import sample.cluster.transformation.TransformationMessages.TransformationJob;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.dispatch.OnSuccess;
|
||||
import akka.util.Timeout;
|
||||
import static akka.pattern.Patterns.ask;
|
||||
|
||||
public class TransformationFrontendMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
// Override the configuration of the port when specified as program argument
|
||||
final String port = args.length > 0 ? args[0] : "0";
|
||||
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
|
||||
withFallback(ConfigFactory.load());
|
||||
|
||||
ActorSystem system = ActorSystem.create("ClusterSystem", config);
|
||||
|
||||
final ActorRef frontend = system.actorOf(
|
||||
Props.create(TransformationFrontend.class), "frontend");
|
||||
final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
|
||||
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
|
||||
final ExecutionContext ec = system.dispatcher();
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
system.scheduler().schedule(interval, interval, new Runnable() {
|
||||
public void run() {
|
||||
ask(frontend,
|
||||
new TransformationJob("hello-" + counter.incrementAndGet()),
|
||||
timeout).onSuccess(new OnSuccess<Object>() {
|
||||
public void onSuccess(Object result) {
|
||||
System.out.println(result);
|
||||
}
|
||||
}, ec);
|
||||
}
|
||||
|
||||
}, ec);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
package sample.cluster.transformation;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
//#messages
|
||||
public interface TransformationMessages {
|
||||
|
||||
public static class TransformationJob implements Serializable {
|
||||
private final String text;
|
||||
|
||||
public TransformationJob(String text) {
|
||||
this.text = text;
|
||||
}
|
||||
|
||||
public String getText() {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TransformationResult implements Serializable {
|
||||
private final String text;
|
||||
|
||||
public TransformationResult(String text) {
|
||||
this.text = text;
|
||||
}
|
||||
|
||||
public String getText() {
|
||||
return text;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TransformationResult(" + text + ")";
|
||||
}
|
||||
}
|
||||
|
||||
public static class JobFailed implements Serializable {
|
||||
private final String reason;
|
||||
private final TransformationJob job;
|
||||
|
||||
public JobFailed(String reason, TransformationJob job) {
|
||||
this.reason = reason;
|
||||
this.job = job;
|
||||
}
|
||||
|
||||
public String getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
public TransformationJob getJob() {
|
||||
return job;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JobFailed(" + reason + ")";
|
||||
}
|
||||
}
|
||||
|
||||
public static final String BACKEND_REGISTRATION = "BackendRegistration";
|
||||
|
||||
}
|
||||
//#messages
|
||||
Loading…
Add table
Add a link
Reference in a new issue