Convert UntypedActor to AbstractActor #22308

This commit is contained in:
ortigali 2017-02-23 00:50:07 +05:00
parent 94afbee179
commit a175180b44
13 changed files with 217 additions and 264 deletions

View file

@ -28,8 +28,11 @@ public class CustomRouteTest extends JUnitSuite {
private ActorSystem system = null;
private Camel camel = null;
public static class MyActor extends UntypedActor {
@Override public void onReceive(Object o) {}
public static class MyActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
@Before

View file

@ -1,38 +1,33 @@
package docs.cluster;
import java.math.BigInteger;
import java.util.concurrent.Callable;
import scala.concurrent.Future;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.dispatch.Mapper;
import static akka.dispatch.Futures.future;
import static akka.pattern.Patterns.pipe;
//#backend
public class FactorialBackend extends UntypedActor {
public class FactorialBackend extends AbstractActor {
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
final Integer n = (Integer) message;
Future<BigInteger> f = future(new Callable<BigInteger>() {
public BigInteger call() {
return factorial(n);
}
}, getContext().dispatcher());
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, n -> {
Future<BigInteger> f = future(() -> factorial(n),
getContext().dispatcher());
Future<FactorialResult> result = f.map(
Future<FactorialResult> result = f.map(
new Mapper<BigInteger, FactorialResult>() {
public FactorialResult apply(BigInteger factorial) {
return new FactorialResult(n, factorial);
}
}, getContext().dispatcher());
pipe(result, getContext().dispatcher()).to(getSender());
pipe(result, getContext().dispatcher()).to(sender());
} else {
unhandled(message);
}
})
.build();
}
BigInteger factorial(int n) {

View file

@ -13,18 +13,16 @@ import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.cluster.routing.ClusterRouterPool;
import akka.cluster.routing.ClusterRouterPoolSettings;
import akka.routing.ConsistentHashingGroup;
import akka.routing.ConsistentHashingPool;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.FromConfig;
//#frontend
public class FactorialFrontend extends UntypedActor {
public class FactorialFrontend extends AbstractActor {
final int upToN;
final boolean repeat;
@ -45,30 +43,28 @@ public class FactorialFrontend extends UntypedActor {
}
@Override
public void onReceive(Object message) {
if (message instanceof FactorialResult) {
FactorialResult result = (FactorialResult) message;
if (result.n == upToN) {
log.debug("{}! = {}", result.n, result.factorial);
if (repeat)
sendJobs();
else
getContext().stop(getSelf());
}
} else if (message instanceof ReceiveTimeout) {
log.info("Timeout");
sendJobs();
} else {
unhandled(message);
}
public Receive createReceive() {
return receiveBuilder()
.match(FactorialResult.class, result -> {
if (result.n == upToN) {
log.debug("{}! = {}", result.n, result.factorial);
if (repeat)
sendJobs();
else
getContext().stop(self());
}
})
.match(ReceiveTimeout.class, x -> {
log.info("Timeout");
sendJobs();
})
.build();
}
void sendJobs() {
log.info("Starting batch of factorials up to [{}]", upToN);
for (int n = 1; n <= upToN; n++) {
backend.tell(n, getSelf());
backend.tell(n, self());
}
}
@ -76,7 +72,7 @@ public class FactorialFrontend extends UntypedActor {
//#frontend
//not used, only for documentation
abstract class FactorialFrontend2 extends UntypedActor {
abstract class FactorialFrontend2 extends AbstractActor {
//#router-lookup-in-code
int totalInstances = 100;
Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", "");
@ -91,7 +87,7 @@ abstract class FactorialFrontend2 extends UntypedActor {
}
//not used, only for documentation
abstract class FactorialFrontend3 extends UntypedActor {
abstract class FactorialFrontend3 extends AbstractActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;

View file

@ -1,7 +1,7 @@
package docs.cluster;
//#metrics-listener
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.metrics.ClusterMetricsChanged;
@ -13,7 +13,7 @@ import akka.cluster.metrics.ClusterMetricsExtension;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MetricsListener extends UntypedActor {
public class MetricsListener extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@ -33,23 +33,21 @@ public class MetricsListener extends UntypedActor {
extension.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof ClusterMetricsChanged) {
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
logHeap(nodeMetrics);
logCpu(nodeMetrics);
public Receive createReceive() {
return receiveBuilder()
.match(ClusterMetricsChanged.class, clusterMetrics -> {
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
logHeap(nodeMetrics);
logCpu(nodeMetrics);
}
}
}
} else if (message instanceof CurrentClusterState) {
// Ignore.
} else {
unhandled(message);
}
})
.match(CurrentClusterState.class, message -> {
// Ignore.
})
.build();
}
void logHeap(NodeMetrics nodeMetrics) {

View file

@ -1,6 +1,6 @@
package docs.cluster;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberEvent;
@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class SimpleClusterListener extends UntypedActor {
public class SimpleClusterListener extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@ -30,25 +30,20 @@ public class SimpleClusterListener extends UntypedActor {
}
@Override
public void onReceive(Object message) {
if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
// ignore
} else {
unhandled(message);
}
public Receive createReceive() {
return receiveBuilder()
.match(MemberUp.class, mUp -> {
log.info("Member is Up: {}", mUp.member());
})
.match(UnreachableMember.class, mUnreachable -> {
log.info("Member detected as unreachable: {}", mUnreachable.member());
})
.match(MemberRemoved.class, mRemoved -> {
log.info("Member is Removed: {}", mRemoved.member());
})
.match(MemberEvent.class, message -> {
// ignore
})
.build();
}
}

View file

@ -1,6 +1,6 @@
package docs.cluster;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class SimpleClusterListener2 extends UntypedActor {
public class SimpleClusterListener2 extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@ -29,29 +29,23 @@ public class SimpleClusterListener2 extends UntypedActor {
}
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
log.info("Current members: {}", state.members());
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
// ignore
} else {
unhandled(message);
}
public Receive createReceive() {
return receiveBuilder()
.match(CurrentClusterState.class, state -> {
log.info("Current members: {}", state.members());
})
.match(MemberUp.class, mUp -> {
log.info("Member is Up: {}", mUp.member());
})
.match(UnreachableMember.class, mUnreachable -> {
log.info("Member detected as unreachable: {}", mUnreachable.member());
})
.match(MemberRemoved.class, mRemoved -> {
log.info("Member is Removed: {}", mRemoved.member());
})
.match(MemberEvent.class, event -> {
// ignore
})
.build();
}
}

View file

@ -9,10 +9,10 @@ import docs.cluster.StatsMessages.StatsResult;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#aggregator
public class StatsAggregator extends UntypedActor {
public class StatsAggregator extends AbstractActor {
final int expectedResults;
final ActorRef replyTo;
@ -29,27 +29,26 @@ public class StatsAggregator extends UntypedActor {
}
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
Integer wordCount = (Integer) message;
results.add(wordCount);
if (results.size() == expectedResults) {
int sum = 0;
for (int c : results)
sum += c;
double meanWordLength = ((double) sum) / results.size();
replyTo.tell(new StatsResult(meanWordLength), getSelf());
getContext().stop(getSelf());
}
} else if (message == ReceiveTimeout.getInstance()) {
replyTo.tell(new JobFailed("Service unavailable, try again later"),
getSelf());
getContext().stop(getSelf());
} else {
unhandled(message);
}
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, wordCount -> {
results.add(wordCount);
if (results.size() == expectedResults) {
int sum = 0;
for (int c : results) {
sum += c;
}
double meanWordLength = ((double) sum) / results.size();
replyTo.tell(new StatsResult(meanWordLength), self());
getContext().stop(self());
}
})
.match(ReceiveTimeout.class, x -> {
replyTo.tell(new JobFailed("Service unavailable, try again later"),
self());
getContext().stop(self());
})
.build();
}
}

View file

@ -15,7 +15,7 @@ import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.cluster.ClusterEvent.ReachableMember;
@ -26,7 +26,7 @@ import akka.cluster.ClusterEvent.ReachabilityEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
public class StatsSampleClient extends UntypedActor {
public class StatsSampleClient extends AbstractActor {
final String servicePath;
final Cancellable tickTask;
@ -40,7 +40,7 @@ public class StatsSampleClient extends UntypedActor {
tickTask = getContext()
.system()
.scheduler()
.schedule(interval, interval, getSelf(), "tick",
.schedule(interval, interval, self(), "tick",
getContext().dispatcher(), null);
}
@ -53,59 +53,48 @@ public class StatsSampleClient extends UntypedActor {
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
cluster.unsubscribe(self());
tickTask.cancel();
}
@Override
public void onReceive(Object message) {
if (message.equals("tick") && !nodes.isEmpty()) {
// just pick any one
List<Address> nodesList = new ArrayList<Address>(nodes);
Address address = nodesList.get(ThreadLocalRandom.current().nextInt(
public Receive createReceive() {
return receiveBuilder()
.matchEquals("tick", x -> !nodes.isEmpty(), x -> {
// just pick any one
List<Address> nodesList = new ArrayList<Address>(nodes);
Address address = nodesList.get(ThreadLocalRandom.current().nextInt(
nodesList.size()));
ActorSelection service = getContext().actorSelection(address + servicePath);
service.tell(new StatsJob("this is the text that will be analyzed"),
getSelf());
} else if (message instanceof StatsResult) {
StatsResult result = (StatsResult) message;
System.out.println(result);
} else if (message instanceof JobFailed) {
JobFailed failed = (JobFailed) message;
System.out.println(failed);
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
nodes.clear();
for (Member member : state.getMembers()) {
if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) {
nodes.add(member.address());
ActorSelection service = getContext().actorSelection(address + servicePath);
service.tell(new StatsJob("this is the text that will be analyzed"),
self());
})
.match(StatsResult.class, System.out::println)
.match(JobFailed.class, System.out::println)
.match(CurrentClusterState.class, state -> {
nodes.clear();
for (Member member : state.getMembers()) {
if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) {
nodes.add(member.address());
}
}
}
})
.match(MemberUp.class, mUp -> {
if (mUp.member().hasRole("compute"))
nodes.add(mUp.member().address());
})
.match(MemberEvent.class, event -> {
nodes.remove(event.member().address());
})
.match(UnreachableMember.class, unreachable -> {
nodes.remove(unreachable.member().address());
})
.match(ReachableMember.class, reachable -> {
if (reachable.member().hasRole("compute"))
nodes.add(reachable.member().address());
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
if (mUp.member().hasRole("compute"))
nodes.add(mUp.member().address());
} else if (message instanceof MemberEvent) {
MemberEvent other = (MemberEvent) message;
nodes.remove(other.member().address());
} else if (message instanceof UnreachableMember) {
UnreachableMember unreachable = (UnreachableMember) message;
nodes.remove(unreachable.member().address());
} else if (message instanceof ReachableMember) {
ReachableMember reachable = (ReachableMember) message;
if (reachable.member().hasRole("compute"))
nodes.add(reachable.member().address());
} else {
unhandled(message);
}
})
.build();
}
}

View file

@ -9,14 +9,14 @@ import akka.routing.ConsistentHashingPool;
import docs.cluster.StatsMessages.StatsJob;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
import akka.routing.FromConfig;
import java.util.Collections;
//#service
public class StatsService extends UntypedActor {
public class StatsService extends AbstractActor {
// This router is used both with lookup and deploy of routees. If you
// have a router with only lookup of routees you can use Props.empty()
@ -26,35 +26,30 @@ public class StatsService extends UntypedActor {
"workerRouter");
@Override
public void onReceive(Object message) {
if (message instanceof StatsJob) {
StatsJob job = (StatsJob) message;
if (job.getText().equals("")) {
unhandled(message);
} else {
final String[] words = job.getText().split(" ");
final ActorRef replyTo = getSender();
public Receive createReceive() {
return receiveBuilder()
.match(StatsJob.class, job -> !job.getText().isEmpty(), job -> {
String[] words = job.getText().split(" ");
ActorRef replyTo = sender();
// create actor that collects replies from workers
ActorRef aggregator = getContext().actorOf(
Props.create(StatsAggregator.class, words.length, replyTo));
Props.create(StatsAggregator.class, words.length, replyTo));
// send each word to a worker
for (String word : words) {
workerRouter.tell(new ConsistentHashableEnvelope(word, word),
aggregator);
aggregator);
}
}
} else {
unhandled(message);
}
})
.build();
}
}
//#service
//not used, only for documentation
abstract class StatsService2 extends UntypedActor {
abstract class StatsService2 extends AbstractActor {
//#router-lookup-in-code
int totalInstances = 100;
Iterable<String> routeesPaths = Collections
@ -69,7 +64,7 @@ abstract class StatsService2 extends UntypedActor {
}
//not used, only for documentation
abstract class StatsService3 extends UntypedActor {
abstract class StatsService3 extends AbstractActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;

View file

@ -3,28 +3,25 @@ package docs.cluster;
import java.util.HashMap;
import java.util.Map;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#worker
public class StatsWorker extends UntypedActor {
public class StatsWorker extends AbstractActor {
Map<String, Integer> cache = new HashMap<String, Integer>();
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String word = (String) message;
Integer length = cache.get(word);
if (length == null) {
length = word.length();
cache.put(word, length);
}
getSender().tell(length, getSelf());
} else {
unhandled(message);
}
public Receive createReceive() {
return receiveBuilder()
.match(String.class, word -> {
Integer length = cache.get(word);
if (length == null) {
length = word.length();
cache.put(word, length);
}
sender().tell(length, self());
})
.build();
}
}
//#worker

View file

@ -3,7 +3,7 @@ package docs.cluster;
import static docs.cluster.TransformationMessages.BACKEND_REGISTRATION;
import docs.cluster.TransformationMessages.TransformationJob;
import docs.cluster.TransformationMessages.TransformationResult;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
@ -11,50 +11,46 @@ import akka.cluster.Member;
import akka.cluster.MemberStatus;
//#backend
public class TransformationBackend extends UntypedActor {
public class TransformationBackend extends AbstractActor {
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberUp.class);
cluster.subscribe(self(), MemberUp.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
cluster.unsubscribe(self());
}
@Override
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new TransformationResult(job.getText().toUpperCase()),
getSelf());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
public Receive createReceive() {
return receiveBuilder()
.match(TransformationJob.class, job -> {
sender().tell(new TransformationResult(job.getText().toUpperCase()),
self());
})
.match(CurrentClusterState.class, state -> {
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
} else {
unhandled(message);
}
})
.match(MemberUp.class, mUp -> {
register(mUp.member());
})
.build();
}
void register(Member member) {
if (member.hasRole("frontend"))
getContext().actorSelection(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
BACKEND_REGISTRATION, self());
}
}
//#backend

View file

@ -9,39 +9,35 @@ import docs.cluster.TransformationMessages.JobFailed;
import docs.cluster.TransformationMessages.TransformationJob;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#frontend
public class TransformationFrontend extends UntypedActor {
public class TransformationFrontend extends AbstractActor {
List<ActorRef> backends = new ArrayList<ActorRef>();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
TransformationJob job = (TransformationJob) message;
getSender().tell(
public Receive createReceive() {
return receiveBuilder()
.match(TransformationJob.class, job -> backends.isEmpty(), job -> {
sender().tell(
new JobFailed("Service unavailable, try again later", job),
getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
backends.get(jobCounter % backends.size())
sender());
})
.match(TransformationJob.class, job -> {
jobCounter++;
backends.get(jobCounter % backends.size())
.forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
backends.remove(terminated.getActor());
} else {
unhandled(message);
}
})
.matchEquals(BACKEND_REGISTRATION, x -> {
getContext().watch(sender());
backends.add(sender());
})
.match(Terminated.class, terminated -> {
backends.remove(terminated.getActor());
})
.build();
}
}

View file

@ -167,7 +167,7 @@ New::
}
It's recommended to migrate ``UntypedActor`` to ``AbstractActor`` by implementing
``createReceive`` instead of ``onMessage``.
``createReceive`` instead of ``onReceive``.
Old::