diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTest.java b/akka-camel/src/test/java/akka/camel/CustomRouteTest.java index 262c0ea5f5..d8c6cc1394 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTest.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTest.java @@ -28,8 +28,11 @@ public class CustomRouteTest extends JUnitSuite { private ActorSystem system = null; private Camel camel = null; - public static class MyActor extends UntypedActor { - @Override public void onReceive(Object o) {} + public static class MyActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder().build(); + } } @Before diff --git a/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java b/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java index bdb15acb3a..2411565048 100644 --- a/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java +++ b/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java @@ -1,38 +1,33 @@ package docs.cluster; import java.math.BigInteger; -import java.util.concurrent.Callable; import scala.concurrent.Future; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.dispatch.Mapper; import static akka.dispatch.Futures.future; import static akka.pattern.Patterns.pipe; //#backend -public class FactorialBackend extends UntypedActor { +public class FactorialBackend extends AbstractActor { @Override - public void onReceive(Object message) { - if (message instanceof Integer) { - final Integer n = (Integer) message; - Future f = future(new Callable() { - public BigInteger call() { - return factorial(n); - } - }, getContext().dispatcher()); + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, n -> { + Future f = future(() -> factorial(n), + getContext().dispatcher()); - Future result = f.map( + Future result = f.map( new Mapper() { public FactorialResult apply(BigInteger factorial) { return new FactorialResult(n, factorial); } }, getContext().dispatcher()); - pipe(result, getContext().dispatcher()).to(getSender()); + pipe(result, getContext().dispatcher()).to(sender()); - } else { - unhandled(message); - } + }) + .build(); } BigInteger factorial(int n) { diff --git a/akka-docs/rst/java/code/docs/cluster/FactorialFrontend.java b/akka-docs/rst/java/code/docs/cluster/FactorialFrontend.java index 0d0ad7be4e..34fb095880 100644 --- a/akka-docs/rst/java/code/docs/cluster/FactorialFrontend.java +++ b/akka-docs/rst/java/code/docs/cluster/FactorialFrontend.java @@ -13,18 +13,16 @@ import akka.cluster.routing.ClusterRouterGroup; import akka.cluster.routing.ClusterRouterGroupSettings; import akka.cluster.routing.ClusterRouterPool; import akka.cluster.routing.ClusterRouterPoolSettings; -import akka.routing.ConsistentHashingGroup; -import akka.routing.ConsistentHashingPool; import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.routing.FromConfig; //#frontend -public class FactorialFrontend extends UntypedActor { +public class FactorialFrontend extends AbstractActor { final int upToN; final boolean repeat; @@ -45,30 +43,28 @@ public class FactorialFrontend extends UntypedActor { } @Override - public void onReceive(Object message) { - if (message instanceof FactorialResult) { - FactorialResult result = (FactorialResult) message; - if (result.n == upToN) { - log.debug("{}! = {}", result.n, result.factorial); - if (repeat) - sendJobs(); - else - getContext().stop(getSelf()); - } - - } else if (message instanceof ReceiveTimeout) { - log.info("Timeout"); - sendJobs(); - - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(FactorialResult.class, result -> { + if (result.n == upToN) { + log.debug("{}! = {}", result.n, result.factorial); + if (repeat) + sendJobs(); + else + getContext().stop(self()); + } + }) + .match(ReceiveTimeout.class, x -> { + log.info("Timeout"); + sendJobs(); + }) + .build(); } void sendJobs() { log.info("Starting batch of factorials up to [{}]", upToN); for (int n = 1; n <= upToN; n++) { - backend.tell(n, getSelf()); + backend.tell(n, self()); } } @@ -76,7 +72,7 @@ public class FactorialFrontend extends UntypedActor { //#frontend //not used, only for documentation -abstract class FactorialFrontend2 extends UntypedActor { +abstract class FactorialFrontend2 extends AbstractActor { //#router-lookup-in-code int totalInstances = 100; Iterable routeesPaths = Arrays.asList("/user/factorialBackend", ""); @@ -91,7 +87,7 @@ abstract class FactorialFrontend2 extends UntypedActor { } //not used, only for documentation -abstract class FactorialFrontend3 extends UntypedActor { +abstract class FactorialFrontend3 extends AbstractActor { //#router-deploy-in-code int totalInstances = 100; int maxInstancesPerNode = 3; diff --git a/akka-docs/rst/java/code/docs/cluster/MetricsListener.java b/akka-docs/rst/java/code/docs/cluster/MetricsListener.java index 265ecf28f6..e9fd928ea2 100644 --- a/akka-docs/rst/java/code/docs/cluster/MetricsListener.java +++ b/akka-docs/rst/java/code/docs/cluster/MetricsListener.java @@ -1,7 +1,7 @@ package docs.cluster; //#metrics-listener -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.metrics.ClusterMetricsChanged; @@ -13,7 +13,7 @@ import akka.cluster.metrics.ClusterMetricsExtension; import akka.event.Logging; import akka.event.LoggingAdapter; -public class MetricsListener extends UntypedActor { +public class MetricsListener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); @@ -33,23 +33,21 @@ public class MetricsListener extends UntypedActor { extension.unsubscribe(getSelf()); } - @Override - public void onReceive(Object message) { - if (message instanceof ClusterMetricsChanged) { - ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message; - for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) { - if (nodeMetrics.address().equals(cluster.selfAddress())) { - logHeap(nodeMetrics); - logCpu(nodeMetrics); + public Receive createReceive() { + return receiveBuilder() + .match(ClusterMetricsChanged.class, clusterMetrics -> { + for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) { + if (nodeMetrics.address().equals(cluster.selfAddress())) { + logHeap(nodeMetrics); + logCpu(nodeMetrics); + } } - } - - } else if (message instanceof CurrentClusterState) { - // Ignore. - } else { - unhandled(message); - } + }) + .match(CurrentClusterState.class, message -> { + // Ignore. + }) + .build(); } void logHeap(NodeMetrics nodeMetrics) { diff --git a/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener.java b/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener.java index b48cdcb67c..8774ac83be 100644 --- a/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener.java +++ b/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener.java @@ -1,6 +1,6 @@ package docs.cluster; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberEvent; @@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember; import akka.event.Logging; import akka.event.LoggingAdapter; -public class SimpleClusterListener extends UntypedActor { +public class SimpleClusterListener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); @@ -30,25 +30,20 @@ public class SimpleClusterListener extends UntypedActor { } @Override - public void onReceive(Object message) { - if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - log.info("Member is Up: {}", mUp.member()); - - } else if (message instanceof UnreachableMember) { - UnreachableMember mUnreachable = (UnreachableMember) message; - log.info("Member detected as unreachable: {}", mUnreachable.member()); - - } else if (message instanceof MemberRemoved) { - MemberRemoved mRemoved = (MemberRemoved) message; - log.info("Member is Removed: {}", mRemoved.member()); - - } else if (message instanceof MemberEvent) { - // ignore - - } else { - unhandled(message); - } - + public Receive createReceive() { + return receiveBuilder() + .match(MemberUp.class, mUp -> { + log.info("Member is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + log.info("Member detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + log.info("Member is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, message -> { + // ignore + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener2.java b/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener2.java index 6eebd523ea..e9a9c5fa63 100644 --- a/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener2.java +++ b/akka-docs/rst/java/code/docs/cluster/SimpleClusterListener2.java @@ -1,6 +1,6 @@ package docs.cluster; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberEvent; @@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember; import akka.event.Logging; import akka.event.LoggingAdapter; -public class SimpleClusterListener2 extends UntypedActor { +public class SimpleClusterListener2 extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); @@ -29,29 +29,23 @@ public class SimpleClusterListener2 extends UntypedActor { } @Override - public void onReceive(Object message) { - if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - log.info("Current members: {}", state.members()); - - } else if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - log.info("Member is Up: {}", mUp.member()); - - } else if (message instanceof UnreachableMember) { - UnreachableMember mUnreachable = (UnreachableMember) message; - log.info("Member detected as unreachable: {}", mUnreachable.member()); - - } else if (message instanceof MemberRemoved) { - MemberRemoved mRemoved = (MemberRemoved) message; - log.info("Member is Removed: {}", mRemoved.member()); - - } else if (message instanceof MemberEvent) { - // ignore - - } else { - unhandled(message); - } - + public Receive createReceive() { + return receiveBuilder() + .match(CurrentClusterState.class, state -> { + log.info("Current members: {}", state.members()); + }) + .match(MemberUp.class, mUp -> { + log.info("Member is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + log.info("Member detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + log.info("Member is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, event -> { + // ignore + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/cluster/StatsAggregator.java b/akka-docs/rst/java/code/docs/cluster/StatsAggregator.java index 76c8144c75..83c5929bc6 100644 --- a/akka-docs/rst/java/code/docs/cluster/StatsAggregator.java +++ b/akka-docs/rst/java/code/docs/cluster/StatsAggregator.java @@ -9,10 +9,10 @@ import docs.cluster.StatsMessages.StatsResult; import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#aggregator -public class StatsAggregator extends UntypedActor { +public class StatsAggregator extends AbstractActor { final int expectedResults; final ActorRef replyTo; @@ -29,27 +29,26 @@ public class StatsAggregator extends UntypedActor { } @Override - public void onReceive(Object message) { - if (message instanceof Integer) { - Integer wordCount = (Integer) message; - results.add(wordCount); - if (results.size() == expectedResults) { - int sum = 0; - for (int c : results) - sum += c; - double meanWordLength = ((double) sum) / results.size(); - replyTo.tell(new StatsResult(meanWordLength), getSelf()); - getContext().stop(getSelf()); - } - - } else if (message == ReceiveTimeout.getInstance()) { - replyTo.tell(new JobFailed("Service unavailable, try again later"), - getSelf()); - getContext().stop(getSelf()); - - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, wordCount -> { + results.add(wordCount); + if (results.size() == expectedResults) { + int sum = 0; + for (int c : results) { + sum += c; + } + double meanWordLength = ((double) sum) / results.size(); + replyTo.tell(new StatsResult(meanWordLength), self()); + getContext().stop(self()); + } + }) + .match(ReceiveTimeout.class, x -> { + replyTo.tell(new JobFailed("Service unavailable, try again later"), + self()); + getContext().stop(self()); + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/cluster/StatsSampleClient.java b/akka-docs/rst/java/code/docs/cluster/StatsSampleClient.java index ff9cd530af..c0da371e16 100644 --- a/akka-docs/rst/java/code/docs/cluster/StatsSampleClient.java +++ b/akka-docs/rst/java/code/docs/cluster/StatsSampleClient.java @@ -15,7 +15,7 @@ import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.ClusterEvent.ReachableMember; @@ -26,7 +26,7 @@ import akka.cluster.ClusterEvent.ReachabilityEvent; import akka.cluster.Member; import akka.cluster.MemberStatus; -public class StatsSampleClient extends UntypedActor { +public class StatsSampleClient extends AbstractActor { final String servicePath; final Cancellable tickTask; @@ -40,7 +40,7 @@ public class StatsSampleClient extends UntypedActor { tickTask = getContext() .system() .scheduler() - .schedule(interval, interval, getSelf(), "tick", + .schedule(interval, interval, self(), "tick", getContext().dispatcher(), null); } @@ -53,59 +53,48 @@ public class StatsSampleClient extends UntypedActor { //re-subscribe when restart @Override public void postStop() { - cluster.unsubscribe(getSelf()); + cluster.unsubscribe(self()); tickTask.cancel(); } @Override - public void onReceive(Object message) { - if (message.equals("tick") && !nodes.isEmpty()) { - // just pick any one - List
nodesList = new ArrayList
(nodes); - Address address = nodesList.get(ThreadLocalRandom.current().nextInt( + public Receive createReceive() { + return receiveBuilder() + .matchEquals("tick", x -> !nodes.isEmpty(), x -> { + // just pick any one + List
nodesList = new ArrayList
(nodes); + Address address = nodesList.get(ThreadLocalRandom.current().nextInt( nodesList.size())); - ActorSelection service = getContext().actorSelection(address + servicePath); - service.tell(new StatsJob("this is the text that will be analyzed"), - getSelf()); - - } else if (message instanceof StatsResult) { - StatsResult result = (StatsResult) message; - System.out.println(result); - - } else if (message instanceof JobFailed) { - JobFailed failed = (JobFailed) message; - System.out.println(failed); - - } else if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - nodes.clear(); - for (Member member : state.getMembers()) { - if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) { - nodes.add(member.address()); + ActorSelection service = getContext().actorSelection(address + servicePath); + service.tell(new StatsJob("this is the text that will be analyzed"), + self()); + }) + .match(StatsResult.class, System.out::println) + .match(JobFailed.class, System.out::println) + .match(CurrentClusterState.class, state -> { + nodes.clear(); + for (Member member : state.getMembers()) { + if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) { + nodes.add(member.address()); + } } - } + }) + .match(MemberUp.class, mUp -> { + if (mUp.member().hasRole("compute")) + nodes.add(mUp.member().address()); + }) + .match(MemberEvent.class, event -> { + nodes.remove(event.member().address()); + }) + .match(UnreachableMember.class, unreachable -> { + nodes.remove(unreachable.member().address()); + }) + .match(ReachableMember.class, reachable -> { + if (reachable.member().hasRole("compute")) + nodes.add(reachable.member().address()); - } else if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - if (mUp.member().hasRole("compute")) - nodes.add(mUp.member().address()); - - } else if (message instanceof MemberEvent) { - MemberEvent other = (MemberEvent) message; - nodes.remove(other.member().address()); - - } else if (message instanceof UnreachableMember) { - UnreachableMember unreachable = (UnreachableMember) message; - nodes.remove(unreachable.member().address()); - - } else if (message instanceof ReachableMember) { - ReachableMember reachable = (ReachableMember) message; - if (reachable.member().hasRole("compute")) - nodes.add(reachable.member().address()); - - } else { - unhandled(message); - } + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/cluster/StatsService.java b/akka-docs/rst/java/code/docs/cluster/StatsService.java index 993cfc3f4b..f81827b471 100644 --- a/akka-docs/rst/java/code/docs/cluster/StatsService.java +++ b/akka-docs/rst/java/code/docs/cluster/StatsService.java @@ -9,14 +9,14 @@ import akka.routing.ConsistentHashingPool; import docs.cluster.StatsMessages.StatsJob; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; import akka.routing.FromConfig; import java.util.Collections; //#service -public class StatsService extends UntypedActor { +public class StatsService extends AbstractActor { // This router is used both with lookup and deploy of routees. If you // have a router with only lookup of routees you can use Props.empty() @@ -26,35 +26,30 @@ public class StatsService extends UntypedActor { "workerRouter"); @Override - public void onReceive(Object message) { - if (message instanceof StatsJob) { - StatsJob job = (StatsJob) message; - if (job.getText().equals("")) { - unhandled(message); - } else { - final String[] words = job.getText().split(" "); - final ActorRef replyTo = getSender(); + public Receive createReceive() { + return receiveBuilder() + .match(StatsJob.class, job -> !job.getText().isEmpty(), job -> { + String[] words = job.getText().split(" "); + ActorRef replyTo = sender(); // create actor that collects replies from workers ActorRef aggregator = getContext().actorOf( - Props.create(StatsAggregator.class, words.length, replyTo)); + Props.create(StatsAggregator.class, words.length, replyTo)); // send each word to a worker for (String word : words) { workerRouter.tell(new ConsistentHashableEnvelope(word, word), - aggregator); + aggregator); } - } - } else { - unhandled(message); - } + }) + .build(); } } //#service //not used, only for documentation -abstract class StatsService2 extends UntypedActor { +abstract class StatsService2 extends AbstractActor { //#router-lookup-in-code int totalInstances = 100; Iterable routeesPaths = Collections @@ -69,7 +64,7 @@ abstract class StatsService2 extends UntypedActor { } //not used, only for documentation -abstract class StatsService3 extends UntypedActor { +abstract class StatsService3 extends AbstractActor { //#router-deploy-in-code int totalInstances = 100; int maxInstancesPerNode = 3; diff --git a/akka-docs/rst/java/code/docs/cluster/StatsWorker.java b/akka-docs/rst/java/code/docs/cluster/StatsWorker.java index 9489fd0848..b62f2a3402 100644 --- a/akka-docs/rst/java/code/docs/cluster/StatsWorker.java +++ b/akka-docs/rst/java/code/docs/cluster/StatsWorker.java @@ -3,28 +3,25 @@ package docs.cluster; import java.util.HashMap; import java.util.Map; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#worker -public class StatsWorker extends UntypedActor { +public class StatsWorker extends AbstractActor { Map cache = new HashMap(); @Override - public void onReceive(Object message) { - if (message instanceof String) { - String word = (String) message; - Integer length = cache.get(word); - if (length == null) { - length = word.length(); - cache.put(word, length); - } - getSender().tell(length, getSelf()); - - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(String.class, word -> { + Integer length = cache.get(word); + if (length == null) { + length = word.length(); + cache.put(word, length); + } + sender().tell(length, self()); + }) + .build(); } - } //#worker \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/cluster/TransformationBackend.java b/akka-docs/rst/java/code/docs/cluster/TransformationBackend.java index afe677b56e..3e1a44c89c 100644 --- a/akka-docs/rst/java/code/docs/cluster/TransformationBackend.java +++ b/akka-docs/rst/java/code/docs/cluster/TransformationBackend.java @@ -3,7 +3,7 @@ package docs.cluster; import static docs.cluster.TransformationMessages.BACKEND_REGISTRATION; import docs.cluster.TransformationMessages.TransformationJob; import docs.cluster.TransformationMessages.TransformationResult; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberUp; @@ -11,50 +11,46 @@ import akka.cluster.Member; import akka.cluster.MemberStatus; //#backend -public class TransformationBackend extends UntypedActor { +public class TransformationBackend extends AbstractActor { Cluster cluster = Cluster.get(getContext().system()); //subscribe to cluster changes, MemberUp @Override public void preStart() { - cluster.subscribe(getSelf(), MemberUp.class); + cluster.subscribe(self(), MemberUp.class); } //re-subscribe when restart @Override public void postStop() { - cluster.unsubscribe(getSelf()); + cluster.unsubscribe(self()); } @Override - public void onReceive(Object message) { - if (message instanceof TransformationJob) { - TransformationJob job = (TransformationJob) message; - getSender().tell(new TransformationResult(job.getText().toUpperCase()), - getSelf()); - - } else if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - for (Member member : state.getMembers()) { - if (member.status().equals(MemberStatus.up())) { - register(member); + public Receive createReceive() { + return receiveBuilder() + .match(TransformationJob.class, job -> { + sender().tell(new TransformationResult(job.getText().toUpperCase()), + self()); + }) + .match(CurrentClusterState.class, state -> { + for (Member member : state.getMembers()) { + if (member.status().equals(MemberStatus.up())) { + register(member); + } } - } - - } else if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - register(mUp.member()); - - } else { - unhandled(message); - } + }) + .match(MemberUp.class, mUp -> { + register(mUp.member()); + }) + .build(); } void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( - BACKEND_REGISTRATION, getSelf()); + BACKEND_REGISTRATION, self()); } } //#backend diff --git a/akka-docs/rst/java/code/docs/cluster/TransformationFrontend.java b/akka-docs/rst/java/code/docs/cluster/TransformationFrontend.java index d06d92ca5e..7d7b1c2430 100644 --- a/akka-docs/rst/java/code/docs/cluster/TransformationFrontend.java +++ b/akka-docs/rst/java/code/docs/cluster/TransformationFrontend.java @@ -9,39 +9,35 @@ import docs.cluster.TransformationMessages.JobFailed; import docs.cluster.TransformationMessages.TransformationJob; import akka.actor.ActorRef; import akka.actor.Terminated; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#frontend -public class TransformationFrontend extends UntypedActor { +public class TransformationFrontend extends AbstractActor { List backends = new ArrayList(); int jobCounter = 0; @Override - public void onReceive(Object message) { - if ((message instanceof TransformationJob) && backends.isEmpty()) { - TransformationJob job = (TransformationJob) message; - getSender().tell( + public Receive createReceive() { + return receiveBuilder() + .match(TransformationJob.class, job -> backends.isEmpty(), job -> { + sender().tell( new JobFailed("Service unavailable, try again later", job), - getSender()); - - } else if (message instanceof TransformationJob) { - TransformationJob job = (TransformationJob) message; - jobCounter++; - backends.get(jobCounter % backends.size()) + sender()); + }) + .match(TransformationJob.class, job -> { + jobCounter++; + backends.get(jobCounter % backends.size()) .forward(job, getContext()); - - } else if (message.equals(BACKEND_REGISTRATION)) { - getContext().watch(getSender()); - backends.add(getSender()); - - } else if (message instanceof Terminated) { - Terminated terminated = (Terminated) message; - backends.remove(terminated.getActor()); - - } else { - unhandled(message); - } + }) + .matchEquals(BACKEND_REGISTRATION, x -> { + getContext().watch(sender()); + backends.add(sender()); + }) + .match(Terminated.class, terminated -> { + backends.remove(terminated.getActor()); + }) + .build(); } } diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 94be8aa263..a23cc5484e 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -167,7 +167,7 @@ New:: } It's recommended to migrate ``UntypedActor`` to ``AbstractActor`` by implementing -``createReceive`` instead of ``onMessage``. +``createReceive`` instead of ``onReceive``. Old::