diff --git a/akka-actor/src/main/java/akka/japi/pf/Match.java b/akka-actor/src/main/java/akka/japi/pf/Match.java index d18e854b14..c7ef9ee0c6 100644 --- a/akka-actor/src/main/java/akka/japi/pf/Match.java +++ b/akka-actor/src/main/java/akka/japi/pf/Match.java @@ -121,7 +121,7 @@ public class Match extends AbstractMatch { *

* *

-   *   Matcher<X, Y> matcher = Matcher.create(...);
+   *   Match<X, Y> matcher = Match.create(...);
    *
    *   Y someY = matcher.match(obj);
    * 
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 59abb36f71..0e963d3a82 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -542,7 +542,7 @@ trait Actor { * Actors are automatically started asynchronously when created. * Empty default implementation. */ - @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest + @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks def preStart(): Unit = () @@ -554,7 +554,7 @@ trait Actor { * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ - @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest + @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks def postStop(): Unit = () @@ -568,7 +568,7 @@ trait Actor { * Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. */ - @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest + @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks def preRestart(reason: Throwable, message: Option[Any]): Unit = { context.children foreach { child ⇒ @@ -586,7 +586,7 @@ trait Actor { *

* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. */ - @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest + @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks def postRestart(reason: Throwable): Unit = { preStart() diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 925fc9ff14..4fa5428cf2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -166,7 +166,7 @@ trait ActorContext extends ActorRefFactory { * UntypedActorContext is the UntypedActor equivalent of ActorContext, * containing the Java API */ -@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0") +@deprecated("Use AbstractActor.ActorContext instead of UntypedActorContext.", since = "2.5.0") trait UntypedActorContext extends ActorContext { /** diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala b/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala index 20fc03bb0d..f460306ab4 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala @@ -44,14 +44,14 @@ package akka.actor * There is also an unrestricted version [[akka.actor.UntypedActorWithUnrestrictedStash]] that does not * enforce the mailbox type. */ -@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0") +@deprecated("Use AbstractActorWithStash instead of UntypedActorWithStash.", since = "2.5.0") abstract class UntypedActorWithStash extends UntypedActor with Stash /** * Actor base class with `Stash` that enforces an unbounded deque for the actor. * See [[akka.actor.UntypedActorWithStash]] for details on how `Stash` works. */ -@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0") +@deprecated("Use AbstractActorWithUnboundedStash instead of UntypedActorWithUnboundedStash.", since = "2.5.0") abstract class UntypedActorWithUnboundedStash extends UntypedActor with UnboundedStash /** @@ -59,5 +59,5 @@ abstract class UntypedActorWithUnboundedStash extends UntypedActor with Unbounde * manually, and the mailbox should extend the [[akka.dispatch.DequeBasedMessageQueueSemantics]] marker trait. * See [[akka.actor.UntypedActorWithStash]] for details on how `Stash` works. */ -@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0") +@deprecated("Use AbstractActorWithUnrestrictedStash instead of UntypedActorWithUnrestrictedStash.", since = "2.5.0") abstract class UntypedActorWithUnrestrictedStash extends UntypedActor with UnrestrictedStash diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java index 0ab90178d0..0552751ba9 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java @@ -31,7 +31,7 @@ public class ReliableProxyTest extends JUnitSuite { private final ActorSystem system = actorSystemResource.getSystem(); static//#demo-proxy - public class ProxyParent extends UntypedActor { + public class ProxyParent extends AbstractActor { private final ActorRef proxy; public ProxyParent(ActorPath targetPath) { @@ -40,17 +40,20 @@ public class ReliableProxyTest extends JUnitSuite { Duration.create(100, TimeUnit.MILLISECONDS))); } - public void onReceive(Object msg) { - if ("hello".equals(msg)) { - proxy.tell("world!", getSelf()); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("hello", m -> { + proxy.tell("world!", self()); + }) + .build(); } } //#demo-proxy static//#demo-transition - public class ProxyTransitionParent extends UntypedActor { + public class ProxyTransitionParent extends AbstractActor { private final ActorRef proxy; private ActorRef client = null; @@ -61,22 +64,24 @@ public class ReliableProxyTest extends JUnitSuite { proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf()); } - public void onReceive(Object msg) { - if ("hello".equals(msg)) { - proxy.tell("world!", getSelf()); - client = getSender(); - } else if (msg instanceof FSM.CurrentState) { - // get initial state - } else if (msg instanceof FSM.Transition) { - @SuppressWarnings("unchecked") - final FSM.Transition transition = - (FSM.Transition) msg; - assert transition.fsmRef().equals(proxy); - if (transition.from().equals(ReliableProxy.active()) && - transition.to().equals(ReliableProxy.idle())) { - client.tell("done", getSelf()); - } - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("hello", message -> { + proxy.tell("world!", self()); + client = sender(); + }) + .matchUnchecked(FSM.CurrentState.class, (FSM.CurrentState state) -> { + // get initial state + }) + .matchUnchecked(FSM.Transition.class, (FSM.Transition transition) -> { + assert transition.fsmRef().equals(proxy); + if (transition.from().equals(ReliableProxy.active()) && + transition.to().equals(ReliableProxy.idle())) { + client.tell("done", self()); + } + }) + .build(); } } diff --git a/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java b/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java index abf7da484e..081211c62d 100644 --- a/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java +++ b/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java @@ -15,8 +15,7 @@ import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.contrib.throttle.TimerBasedThrottler; +import akka.actor.AbstractActor; import akka.testkit.AkkaJUnitActorSystemResource; public class TimerBasedThrottlerTest extends JUnitSuite { @@ -51,10 +50,14 @@ public class TimerBasedThrottlerTest extends JUnitSuite { static//#demo-code //A simple actor that prints whatever it receives - public class Printer extends UntypedActor { + public class Printer extends AbstractActor { @Override - public void onReceive(Object msg) { - System.out.println(msg); + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> { + System.out.println(message); + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java b/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java index 051d22fefe..fd607a8525 100644 --- a/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java +++ b/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java @@ -2,7 +2,7 @@ package docs.io; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.io.Inet; import akka.io.Tcp; import akka.io.TcpMessage; @@ -17,23 +17,26 @@ import java.util.List; */ public class JavaReadBackPressure { - static public class Listener extends UntypedActor { + static public class Listener extends AbstractActor { ActorRef tcp; ActorRef listener; @Override //#pull-accepting - public void onReceive(Object message) throws Exception { - if (message instanceof Tcp.Bound) { - listener = sender(); - // Accept connections one by one - listener.tell(TcpMessage.resumeAccepting(1), self()); - } else if (message instanceof Tcp.Connected) { - ActorRef handler = getContext().actorOf(Props.create(PullEcho.class, sender())); - sender().tell(TcpMessage.register(handler), self()); - // Resume accepting connections - listener.tell(TcpMessage.resumeAccepting(1), self()); - } + public Receive createReceive() { + return receiveBuilder() + .match(Tcp.Bound.class, x -> { + listener = sender(); + // Accept connections one by one + listener.tell(TcpMessage.resumeAccepting(1), self()); + }) + .match(Tcp.Connected.class, x -> { + ActorRef handler = getContext().actorOf(Props.create(PullEcho.class, sender())); + sender().tell(TcpMessage.register(handler), self()); + // Resume accepting connections + listener.tell(TcpMessage.resumeAccepting(1), self()); + }) + .build(); } //#pull-accepting @@ -63,7 +66,7 @@ public class JavaReadBackPressure { static public class Ack implements Tcp.Event { } - static public class PullEcho extends UntypedActor { + static public class PullEcho extends AbstractActor { final ActorRef connection; public PullEcho(ActorRef connection) { @@ -77,17 +80,18 @@ public class JavaReadBackPressure { } @Override - public void onReceive(Object message) throws Exception { - if (message instanceof Tcp.Received) { - ByteString data = ((Tcp.Received) message).data(); - connection.tell(TcpMessage.write(data, new Ack()), self()); - } else if (message instanceof Ack) { - connection.tell(TcpMessage.resumeReading(), self()); - } + public Receive createReceive() { + return receiveBuilder() + .match(Tcp.Received.class, message -> { + ByteString data = message.data(); + connection.tell(TcpMessage.write(data, new Ack()), self()); + }) + .match(Ack.class, message -> { + connection.tell(TcpMessage.resumeReading(), self()); + }) + .build(); } //#pull-reading-echo - - } } diff --git a/akka-docs/rst/java/code/docs/io/JavaUdpMulticast.java b/akka-docs/rst/java/code/docs/io/JavaUdpMulticast.java index 2817d33898..626c051f94 100644 --- a/akka-docs/rst/java/code/docs/io/JavaUdpMulticast.java +++ b/akka-docs/rst/java/code/docs/io/JavaUdpMulticast.java @@ -6,7 +6,7 @@ package docs.io; //#imports import akka.actor.ActorRef; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.io.Inet; @@ -57,7 +57,7 @@ public class JavaUdpMulticast { } //#multicast-group - public static class Listener extends UntypedActor { + public static class Listener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); ActorRef sink; @@ -78,21 +78,22 @@ public class JavaUdpMulticast { } @Override - public void onReceive(Object msg) { - if (msg instanceof Udp.Bound) { - final Udp.Bound b = (Udp.Bound) msg; - log.info("Bound to {}", b.localAddress()); - sink.tell(b, self()); - } else if (msg instanceof Udp.Received) { - final Udp.Received r = (Udp.Received) msg; - final String txt = r.data().decodeString("utf-8"); - log.info("Received '{}' from {}", txt, r.sender()); - sink.tell(txt, self()); - } else unhandled(msg); + public Receive createReceive() { + return receiveBuilder() + .match(Udp.Bound.class, bound -> { + log.info("Bound to {}", bound.localAddress()); + sink.tell(bound, self()); + }) + .match(Udp.Received.class, received -> { + final String txt = received.data().decodeString("utf-8"); + log.info("Received '{}' from {}", txt, received.sender()); + sink.tell(txt, self()); + }) + .build(); } } - public static class Sender extends UntypedActor { + public static class Sender extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); String iface; @@ -114,12 +115,14 @@ public class JavaUdpMulticast { } @Override - public void onReceive(Object msg) { - if (msg instanceof Udp.SimpleSenderReady) { - InetSocketAddress remote = new InetSocketAddress(group + "%" + iface, port); - log.info("Sending message to " + remote); - sender().tell(UdpMessage.send(ByteString.fromString(message), remote), self()); - } else unhandled(msg); + public Receive createReceive() { + return receiveBuilder() + .match(Udp.SimpleSenderReady.class, x -> { + InetSocketAddress remote = new InetSocketAddress(group + "%" + iface, port); + log.info("Sending message to " + remote); + sender().tell(UdpMessage.send(ByteString.fromString(message), remote), self()); + }) + .build(); } } } diff --git a/akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java b/akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java index f9ad8d8642..dec0e8bea4 100644 --- a/akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java +++ b/akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java @@ -4,12 +4,11 @@ package docs.io; -import akka.testkit.AkkaJUnitActorSystemResource; -import org.junit.ClassRule; +import akka.japi.pf.ReceiveBuilder; import org.junit.Test; import akka.actor.ActorSystem; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#imports import java.net.InetSocketAddress; import java.util.ArrayList; @@ -24,14 +23,15 @@ import akka.util.ByteString; public class UdpConnectedDocTest { - static public class Demo extends UntypedActor { + static public class Demo extends AbstractActor { ActorRef connectionActor = null; ActorRef handler = self(); ActorSystem system = getContext().system(); @Override - public void onReceive(Object msg) { - if ("connect".equals(msg)) { + public Receive createReceive() { + ReceiveBuilder builder = receiveBuilder(); + builder.matchEquals("connect", message -> { //#manager final ActorRef udp = UdpConnected.get(system).manager(); //#manager @@ -48,34 +48,33 @@ public class UdpConnectedDocTest { options.add(UdpSO.broadcast(true)); udp.tell(UdpConnectedMessage.connect(handler, remoteAddr, localAddr, options), self()); //#connect-with-options - } else - //#connected - if (msg instanceof UdpConnected.Connected) { - final UdpConnected.Connected conn = (UdpConnected.Connected) msg; - connectionActor = sender(); // Save the worker ref for later use - } - //#connected - else - //#received - if (msg instanceof UdpConnected.Received) { - final UdpConnected.Received recv = (UdpConnected.Received) msg; - final ByteString data = recv.data(); - // and do something with the received data ... - } else if (msg instanceof UdpConnected.CommandFailed) { - final UdpConnected.CommandFailed failed = (UdpConnected.CommandFailed) msg; - final UdpConnected.Command command = failed.cmd(); - // react to failed connect, etc. - } else if (msg instanceof UdpConnected.Disconnected) { - // do something on disconnect - } - //#received - else - if ("send".equals(msg)) { - ByteString data = ByteString.empty(); - //#send - connectionActor.tell(UdpConnectedMessage.send(data), self()); - //#send - } + }); + //#connected + builder.match(UdpConnected.Connected.class, conn -> { + connectionActor = sender(); // Save the worker ref for later use + }); + //#connected + //#received + builder + .match(UdpConnected.Received.class, recv -> { + final ByteString data = recv.data(); + // and do something with the received data ... + }) + .match(UdpConnected.CommandFailed.class, failed -> { + final UdpConnected.Command command = failed.cmd(); + // react to failed connect, etc. + }) + .match(UdpConnected.Disconnected.class, x -> { + // do something on disconnect + }); + //#received + builder.matchEquals("send", x -> { + ByteString data = ByteString.empty(); + //#send + connectionActor.tell(UdpConnectedMessage.send(data), self()); + //#send + }); + return builder.build(); } } diff --git a/akka-docs/rst/java/code/docs/io/UdpDocTest.java b/akka-docs/rst/java/code/docs/io/UdpDocTest.java index 7b5cd2d540..a67b4b06cf 100644 --- a/akka-docs/rst/java/code/docs/io/UdpDocTest.java +++ b/akka-docs/rst/java/code/docs/io/UdpDocTest.java @@ -7,7 +7,7 @@ package docs.io; //#imports import akka.actor.ActorRef; import akka.actor.PoisonPill; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.io.Udp; import akka.io.UdpConnected; import akka.io.UdpConnectedMessage; @@ -21,7 +21,7 @@ import java.net.InetSocketAddress; public class UdpDocTest { //#sender - public static class SimpleSender extends UntypedActor { + public static class SimpleSender extends AbstractActor { final InetSocketAddress remote; public SimpleSender(InetSocketAddress remote) { @@ -33,37 +33,34 @@ public class UdpDocTest { } @Override - public void onReceive(Object msg) { - if (msg instanceof Udp.SimpleSenderReady) { - getContext().become(ready(sender())); - //#sender - sender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), self()); - //#sender - } else unhandled(msg); + public Receive createReceive() { + return receiveBuilder() + .match(Udp.SimpleSenderReady.class, message -> { + getContext().become(ready(sender())); + //#sender + sender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), self()); + //#sender + }) + .build(); } - - private Procedure ready(final ActorRef send) { - return new Procedure() { - @Override - public void apply(Object msg) throws Exception { - if (msg instanceof String) { - final String str = (String) msg; - send.tell(UdpMessage.send(ByteString.fromString(str), remote), self()); - //#sender - if (str.equals("world")) { - send.tell(PoisonPill.getInstance(), self()); - } - //#sender - } else unhandled(msg); - } - }; + private Receive ready(final ActorRef send) { + return receiveBuilder() + .match(String.class, message -> { + send.tell(UdpMessage.send(ByteString.fromString(message), remote), self()); + //#sender + if (message.equals("world")) { + send.tell(PoisonPill.getInstance(), self()); + } + //#sender + }) + .build(); } } //#sender //#listener - public static class Listener extends UntypedActor { + public static class Listener extends AbstractActor { final ActorRef nextActor; public Listener(ActorRef nextActor) { @@ -77,46 +74,42 @@ public class UdpDocTest { } @Override - public void onReceive(Object msg) { - if (msg instanceof Udp.Bound) { - final Udp.Bound b = (Udp.Bound) msg; - //#listener - nextActor.tell(b.localAddress(), sender()); - //#listener - getContext().become(ready(sender())); - } else unhandled(msg); + public Receive createReceive() { + return receiveBuilder() + .match(Udp.Bound.class, bound -> { + //#listener + nextActor.tell(bound.localAddress(), sender()); + //#listener + getContext().become(ready(sender())); + }) + .build(); } - - private Procedure ready(final ActorRef socket) { - return new Procedure() { - @Override - public void apply(Object msg) throws Exception { - if (msg instanceof Udp.Received) { - final Udp.Received r = (Udp.Received) msg; - // echo server example: send back the data - socket.tell(UdpMessage.send(r.data(), r.sender()), self()); - // or do some processing and forward it on - final Object processed = // parse data etc., e.g. using PipelineStage - //#listener - r.data().utf8String(); - //#listener - nextActor.tell(processed, self()); - - } else if (msg.equals(UdpMessage.unbind())) { - socket.tell(msg, self()); - - } else if (msg instanceof Udp.Unbound) { - getContext().stop(self()); - - } else unhandled(msg); - } - }; + + private Receive ready(final ActorRef socket) { + return receiveBuilder() + .match(Udp.Received.class, r -> { + // echo server example: send back the data + socket.tell(UdpMessage.send(r.data(), r.sender()), self()); + // or do some processing and forward it on + final Object processed = // parse data etc., e.g. using PipelineStage + // #listener + r.data().utf8String(); + //#listener + nextActor.tell(processed, self()); + }) + .matchEquals(UdpMessage.unbind(), message -> { + socket.tell(message, self()); + }) + .match(Udp.Unbound.class, message -> { + getContext().stop(self()); + }) + .build(); } } //#listener //#connected - public static class Connected extends UntypedActor { + public static class Connected extends AbstractActor { final InetSocketAddress remote; public Connected(InetSocketAddress remote) { @@ -126,49 +119,45 @@ public class UdpDocTest { final ActorRef mgr = UdpConnected.get(getContext().system()).getManager(); mgr.tell(UdpConnectedMessage.connect(self(), remote), self()); } - + @Override - public void onReceive(Object msg) { - if (msg instanceof UdpConnected.Connected) { - getContext().become(ready(sender())); - //#connected - sender() + public Receive createReceive() { + return receiveBuilder() + .match(UdpConnected.Connected.class, message -> { + getContext().become(ready(sender())); + //#connected + sender() .tell(UdpConnectedMessage.send(ByteString.fromString("hello")), - self()); - //#connected - } else unhandled(msg); + self()); + //#connected + }) + .build(); } - - private Procedure ready(final ActorRef connection) { - return new Procedure() { - @Override - public void apply(Object msg) throws Exception { - if (msg instanceof UdpConnected.Received) { - final UdpConnected.Received r = (UdpConnected.Received) msg; - // process data, send it on, etc. - // #connected - if (r.data().utf8String().equals("hello")) { - connection.tell( - UdpConnectedMessage.send(ByteString.fromString("world")), - self()); - } - // #connected - - } else if (msg instanceof String) { - final String str = (String) msg; - connection - .tell(UdpConnectedMessage.send(ByteString.fromString(str)), - self()); - - } else if (msg.equals(UdpConnectedMessage.disconnect())) { - connection.tell(msg, self()); - - } else if (msg instanceof UdpConnected.Disconnected) { - getContext().stop(self()); - - } else unhandled(msg); - } - }; + + private Receive ready(final ActorRef connection) { + return receiveBuilder() + .match(UdpConnected.Received.class, r -> { + // process data, send it on, etc. + // #connected + if (r.data().utf8String().equals("hello")) { + connection.tell( + UdpConnectedMessage.send(ByteString.fromString("world")), + self()); + } + // #connected + }) + .match(String.class, str -> { + connection + .tell(UdpConnectedMessage.send(ByteString.fromString(str)), + self()); + }) + .matchEquals(UdpConnectedMessage.disconnect(), message -> { + connection.tell(message, self()); + }) + .match(UdpConnected.Disconnected.class, x -> { + getContext().stop(self()); + }) + .build(); } } //#connected diff --git a/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java index 5eddfa7d6b..39cf9ea551 100644 --- a/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/ConsistentHashingRouterDocTest.java @@ -13,7 +13,7 @@ import akka.testkit.JavaTestKit; import akka.actor.ActorSystem; //#imports1 -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.routing.ConsistentHashingRouter.ConsistentHashable; import java.util.Map; @@ -41,24 +41,24 @@ public class ConsistentHashingRouterDocTest extends AbstractJavaTest { static //#cache-actor - public class Cache extends UntypedActor { + public class Cache extends AbstractActor { Map cache = new HashMap(); - public void onReceive(Object msg) { - if (msg instanceof Entry) { - Entry entry = (Entry) msg; - cache.put(entry.key, entry.value); - } else if (msg instanceof Get) { - Get get = (Get) msg; - Object value = cache.get(get.key); - sender().tell(value == null ? NOT_FOUND : value, - getContext().self()); - } else if (msg instanceof Evict) { - Evict evict = (Evict) msg; - cache.remove(evict.key); - } else { - unhandled(msg); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Entry.class, entry -> { + cache.put(entry.key, entry.value); + }) + .match(Get.class, get -> { + Object value = cache.get(get.key); + sender().tell(value == null ? NOT_FOUND : value, + getContext().self()); + }) + .match(Evict.class, evict -> { + cache.remove(evict.key); + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java index fadec0bf2d..dc2d3d3fa9 100644 --- a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTest.java @@ -28,7 +28,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; //#imports1 -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import java.io.Serializable; import java.util.ArrayList; @@ -94,9 +94,14 @@ public class CustomRouterDocTest extends AbstractJavaTest { //#unit-test-logic - static public class Storage extends UntypedActor { - public void onReceive(Object msg) { - sender().tell(msg, self()); + static public class Storage extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> { + sender().tell(message, self()); + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java index 4b3c2c8bd9..2b8bdf6b6a 100644 --- a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java @@ -30,7 +30,7 @@ import akka.actor.ActorSystem; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Terminated; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.routing.ActorRefRoutee; import akka.routing.Routee; import akka.routing.Router; @@ -90,7 +90,7 @@ public class RouterDocTest extends AbstractJavaTest { //#router-in-actor static //#router-in-actor - public class Master extends UntypedActor { + public class Master extends AbstractActor { Router router; { @@ -103,39 +103,54 @@ public class RouterDocTest extends AbstractJavaTest { router = new Router(new RoundRobinRoutingLogic(), routees); } - public void onReceive(Object msg) { - if (msg instanceof Work) { - router.route(msg, sender()); - } else if (msg instanceof Terminated) { - router = router.removeRoutee(((Terminated) msg).actor()); - ActorRef r = getContext().actorOf(Props.create(Worker.class)); - getContext().watch(r); - router = router.addRoutee(new ActorRefRoutee(r)); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Work.class, message -> { + router.route(message, sender()); + }) + .match(Terminated.class, message -> { + router = router.removeRoutee(message.actor()); + ActorRef r = getContext().actorOf(Props.create(Worker.class)); + getContext().watch(r); + router = router.addRoutee(new ActorRefRoutee(r)); + }) + .build(); } } //#router-in-actor - static public class Worker extends UntypedActor { - public void onReceive(Object msg) {} - } - - static public class Echo extends UntypedActor { - public void onReceive(Object msg) { - sender().tell(msg, self()); + static public class Worker extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder().build(); } } - static public class Replier extends UntypedActor { - public void onReceive(Object msg) { - //#reply-with-self - sender().tell("reply", self()); - //#reply-with-self - - //#reply-with-parent - sender().tell("reply", getContext().parent()); - //#reply-with-parent + static public class Echo extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> sender().tell(message, self())) + .build(); + } + } + + static public class Replier extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> { + //#reply-with-self + sender().tell("reply", self()); + //#reply-with-self + + //#reply-with-parent + sender().tell("reply", getContext().parent()); + //#reply-with-parent + }) + .build(); } } @@ -143,7 +158,7 @@ public class RouterDocTest extends AbstractJavaTest { static //#create-worker-actors - public class Workers extends UntypedActor { + public class Workers extends AbstractActor { @Override public void preStart() { getContext().actorOf(Props.create(Worker.class), "w1"); getContext().actorOf(Props.create(Worker.class), "w2"); @@ -151,12 +166,14 @@ public class RouterDocTest extends AbstractJavaTest { } // ... //#create-worker-actors - - public void onReceive(Object msg) {} + @Override + public Receive createReceive() { + return receiveBuilder().build(); + } } - static public class Parent extends UntypedActor { + static public class Parent extends AbstractActor { //#paths List paths = Arrays.asList("/user/workers/w1", "/user/workers/w2", @@ -346,11 +363,12 @@ public class RouterDocTest extends AbstractJavaTest { Props.create(Worker.class)), "router31"); //#optimal-size-exploring-resize-pool - public void onReceive(Object msg) {} + @Override + public Receive createReceive() { + return receiveBuilder().build(); + } } - - @Test public void createActors() { //#create-workers diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java index 7cc9365554..2d304011ca 100644 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java @@ -25,7 +25,7 @@ public class SchedulerPatternTest extends AbstractJavaTest { static //#schedule-constructor - public class ScheduleInConstructor extends UntypedActor { + public class ScheduleInConstructor extends AbstractActor { private final Cancellable tick = getContext().system().scheduler().schedule( Duration.create(500, TimeUnit.MILLISECONDS), @@ -45,28 +45,25 @@ public class SchedulerPatternTest extends AbstractJavaTest { } @Override - public void onReceive(Object message) throws Exception { - if (message.equals("tick")) { - // do something useful here - //#schedule-constructor - target.tell(message, self()); - //#schedule-constructor - } - //#schedule-constructor - else if (message.equals("restart")) { - throw new ArithmeticException(); - } - //#schedule-constructor - else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .matchEquals("tick", message -> { + // do something useful here + //#schedule-constructor + target.tell(message, self()); + //#schedule-constructor + }) + .matchEquals("restart", message -> { + throw new ArithmeticException(); + }) + .build(); } } //#schedule-constructor static //#schedule-receive - public class ScheduleInReceive extends UntypedActor { + public class ScheduleInReceive extends AbstractActor { //#schedule-receive // this variable and constructor is declared here to not show up in the docs final ActorRef target; @@ -88,25 +85,22 @@ public class SchedulerPatternTest extends AbstractJavaTest { } @Override - public void onReceive(Object message) throws Exception { - if (message.equals("tick")) { - // send another periodic tick after the specified delay - getContext().system().scheduler().scheduleOnce( - Duration.create(1, TimeUnit.SECONDS), - self(), "tick", getContext().dispatcher(), null); - // do something useful here - //#schedule-receive - target.tell(message, self()); - //#schedule-receive - } - //#schedule-receive - else if (message.equals("restart")) { - throw new ArithmeticException(); - } - //#schedule-receive - else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .matchEquals("tick", message -> { + // send another periodic tick after the specified delay + getContext().system().scheduler().scheduleOnce( + Duration.create(1, TimeUnit.SECONDS), + self(), "tick", getContext().dispatcher(), null); + // do something useful here + //#schedule-receive + target.tell(message, self()); + //#schedule-receive + }) + .matchEquals("restart", message -> { + throw new ArithmeticException(); + }) + .build(); } } //#schedule-receive diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java index 44cb25ce6a..efe36ea61f 100644 --- a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java @@ -17,7 +17,7 @@ import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy.Directive; import akka.actor.Terminated; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.japi.Function; import akka.pattern.Patterns; import akka.util.Timeout; @@ -39,21 +39,21 @@ public class SupervisedAsk { private static class AskTimeout { } - public static class AskSupervisorCreator extends UntypedActor { + public static class AskSupervisorCreator extends AbstractActor { @Override - public void onReceive(Object message) throws Exception { - if (message instanceof AskParam) { - ActorRef supervisor = getContext().actorOf( + public Receive createReceive() { + return receiveBuilder() + .match(AskParam.class, message -> { + ActorRef supervisor = getContext().actorOf( Props.create(AskSupervisor.class)); - supervisor.forward(message, getContext()); - } else { - unhandled(message); - } + supervisor.forward(message, getContext()); + }) + .build(); } } - public static class AskSupervisor extends UntypedActor { + public static class AskSupervisor extends AbstractActor { private ActorRef targetActor; private ActorRef caller; private AskParam askParam; @@ -71,28 +71,31 @@ public class SupervisedAsk { } @Override - public void onReceive(Object message) throws Exception { - if (message instanceof AskParam) { - askParam = (AskParam) message; - caller = sender(); - targetActor = getContext().actorOf(askParam.props); - getContext().watch(targetActor); - targetActor.forward(askParam.message, getContext()); - Scheduler scheduler = getContext().system().scheduler(); - timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), + public Receive createReceive() { + return receiveBuilder() + .match(AskParam.class, message -> { + askParam = message; + caller = sender(); + targetActor = getContext().actorOf(askParam.props); + getContext().watch(targetActor); + targetActor.forward(askParam.message, getContext()); + Scheduler scheduler = getContext().system().scheduler(); + timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), self(), new AskTimeout(), getContext().dispatcher(), null); - } else if (message instanceof Terminated) { - Throwable ex = new ActorKilledException("Target actor terminated."); - caller.tell(new Status.Failure(ex), self()); - timeoutMessage.cancel(); - getContext().stop(self()); - } else if (message instanceof AskTimeout) { - Throwable ex = new TimeoutException("Target actor timed out after " + }) + .match(Terminated.class, message -> { + Throwable ex = new ActorKilledException("Target actor terminated."); + caller.tell(new Status.Failure(ex), self()); + timeoutMessage.cancel(); + getContext().stop(self()); + }) + .match(AskTimeout.class, message -> { + Throwable ex = new TimeoutException("Target actor timed out after " + askParam.timeout.toString()); - caller.tell(new Status.Failure(ex), self()); - getContext().stop(self()); - } else - unhandled(message); + caller.tell(new Status.Failure(ex), self()); + getContext().stop(self()); + }) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java index 4316f2e876..f8682bbf24 100644 --- a/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java @@ -5,12 +5,12 @@ import scala.concurrent.Future; import akka.actor.ActorRef; import akka.actor.ActorRefFactory; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.util.Timeout; public class SupervisedAskSpec { - public Object execute(Class someActor, + public Object execute(Class someActor, Object message, Timeout timeout, ActorRefFactory actorSystem) throws Exception { // example usage diff --git a/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java b/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java index 834c575cdc..d6b0ac7c20 100644 --- a/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java +++ b/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java @@ -19,13 +19,16 @@ import akka.actor.ActorSystem; import akka.remote.RemoteScope; //#import -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; public class RemoteDeploymentDocTest { - public static class SampleActor extends UntypedActor { - public void onReceive(Object message) { - sender().tell(self(), self()); + public static class SampleActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> sender().tell(self(), self())) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java b/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java index 65de74c2f9..e1456dee51 100644 --- a/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java @@ -295,18 +295,17 @@ public class IntegrationDocTest extends AbstractJavaTest { //#sometimes-slow-service //#ask-actor - static class Translator extends UntypedActor { + static class Translator extends AbstractActor { @Override - public void onReceive(Object message) { - if (message instanceof String) { - String word = (String) message; - // ... process message - String reply = word.toUpperCase(); - // reply to the ask - sender().tell(reply, self()); - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(String.class, word -> { + // ... process message + String reply = word.toUpperCase(); + // reply to the ask + sender().tell(reply, self()); + }) + .build(); } } //#ask-actor diff --git a/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java b/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java index 778146d06d..7c775f4be1 100644 --- a/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java +++ b/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java @@ -27,54 +27,52 @@ public class ParentChildTest { private final ActorSystem system = actorSystemResource.getSystem(); //#test-example - static class Parent extends UntypedActor { + static class Parent extends AbstractActor { final ActorRef child = getContext().actorOf(Props.create(Child.class), "child"); boolean ponged = false; - @Override public void onReceive(Object message) throws Exception { - if ("pingit".equals(message)) { - child.tell("ping", self()); - } else if ("pong".equals(message)) { - ponged = true; - } else { - unhandled(message); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("pingit", message -> child.tell("ping", self())) + .matchEquals("pong", message -> ponged = true) + .build(); } } - static class Child extends UntypedActor { - @Override public void onReceive(Object message) throws Exception { - if ("ping".equals(message)) { - getContext().parent().tell("pong", self()); - } else { - unhandled(message); - } + static class Child extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("ping", message -> { + getContext().parent().tell("pong", self()); + }) + .build(); } } //#test-example static //#test-dependentchild - class DependentChild extends UntypedActor { + class DependentChild extends AbstractActor { private final ActorRef parent; public DependentChild(ActorRef parent) { this.parent = parent; } - @Override public void onReceive(Object message) throws Exception { - if ("ping".equals(message)) { - parent.tell("pong", self()); - } else { - unhandled(message); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("ping", message -> parent.tell("pong", self())) + .build(); } } //#test-dependentchild static //#test-dependentparent - class DependentParent extends UntypedActor { + class DependentParent extends AbstractActor { final ActorRef child; boolean ponged = false; @@ -82,21 +80,19 @@ public class ParentChildTest { child = getContext().actorOf(childProps, "child"); } - @Override public void onReceive(Object message) throws Exception { - if ("pingit".equals(message)) { - child.tell("ping", self()); - } else if ("pong".equals(message)) { - ponged = true; - } else { - unhandled(message); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("pingit", message -> child.tell("ping", self())) + .matchEquals("pong", message -> ponged = true) + .build(); } } //#test-dependentparent static //#test-dependentparent-generic - class GenericDependentParent extends UntypedActor { + class GenericDependentParent extends AbstractActor { final ActorRef child; boolean ponged = false; @@ -105,14 +101,12 @@ public class ParentChildTest { child = childMaker.apply(getContext()); } - @Override public void onReceive(Object message) throws Exception { - if ("pingit".equals(message)) { - child.tell("ping", self()); - } else if ("pong".equals(message)) { - ponged = true; - } else { - unhandled(message); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("pingit", message -> child.tell("ping", self())) + .matchEquals("pong", message -> ponged = true) + .build(); } } //#test-dependentparent-generic @@ -174,15 +168,21 @@ public class ParentChildTest { } @Override public Actor create() throws Exception { - return new UntypedActor() { + return new AbstractActor() { final ActorRef child = getContext().actorOf(Props.create(Child.class), "child"); - @Override public void onReceive(Object x) throws Exception { - if (sender().equals(child)) { - proxy.ref().forward(x, getContext()); - } else { - child.forward(x, getContext()); - } + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> { + if (sender().equals(child)) { + proxy.ref().forward(message, getContext()); + } else { + child.forward(message, getContext()); + } + }) + .build(); + } }; } diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java index 93fc1ea871..472cf33068 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java @@ -18,7 +18,7 @@ import akka.actor.Kill; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import scala.concurrent.Await; import scala.concurrent.Future; import akka.testkit.TestActor.AutoPilot; @@ -34,13 +34,17 @@ public class TestKitDocTest { private final ActorSystem system = actorSystemResource.getSystem(); //#test-actor-ref - static class MyActor extends UntypedActor { - public void onReceive(Object o) throws Exception { - if (o.equals("say42")) { - sender().tell(42, self()); - } else if (o instanceof Exception) { - throw (Exception) o; - } + static class MyActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("say42", message -> { + sender().tell(42, self()); + }) + .match(Exception.class, (Exception ex) -> { + throw ex; + }) + .build(); } public boolean testMe() { return true; } } @@ -259,14 +263,17 @@ public class TestKitDocTest { //#test-probe new JavaTestKit(system) {{ // simple actor which just forwards messages - class Forwarder extends UntypedActor { + class Forwarder extends AbstractActor { final ActorRef target; @SuppressWarnings("unused") public Forwarder(ActorRef target) { this.target = target; } - public void onReceive(Object msg) { - target.forward(msg, getContext()); + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(message -> target.forward(message, getContext())) + .build(); } } diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java index 055d3ca004..a99ec38c6a 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java @@ -12,25 +12,27 @@ import org.junit.Test; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.testkit.JavaTestKit; import scala.concurrent.duration.Duration; public class TestKitSampleTest { - public static class SomeActor extends UntypedActor { + public static class SomeActor extends AbstractActor { ActorRef target = null; - - public void onReceive(Object msg) { - - if (msg.equals("hello")) { - sender().tell("world", self()); - if (target != null) target.forward(msg, getContext()); - - } else if (msg instanceof ActorRef) { - target = (ActorRef) msg; - sender().tell("done", self()); - } + + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals("hello", message -> { + sender().tell("world", self()); + if (target != null) target.forward(message, getContext()); + }) + .match(ActorRef.class, actorRef -> { + target = actorRef; + sender().tell("done", self()); + }) + .build(); } } diff --git a/akka-docs/rst/java/futures.rst b/akka-docs/rst/java/futures.rst index f9af09f782..80d84ec2c7 100644 --- a/akka-docs/rst/java/futures.rst +++ b/akka-docs/rst/java/futures.rst @@ -30,8 +30,8 @@ by the ``ExecutionContexts`` class to wrap ``Executors`` and ``ExecutorServices` Use with Actors --------------- -There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg, sender)``), -which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``. +There are generally two ways of getting a reply from an ``AbstractActor``: the first is by a sent message (``actorRef.tell(msg, sender)``), +which only works if the original sender was an ``AbstractActor``) and the second is through a ``Future``. Using the ``ActorRef``\'s ``ask`` method to send a message will return a ``Future``. To wait for and retrieve the actual result the simplest method is: @@ -42,11 +42,11 @@ To wait for and retrieve the actual result the simplest method is: .. includecode:: code/docs/future/FutureDocTest.java :include: ask-blocking -This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply. +This will cause the current thread to block and wait for the ``AbstractActor`` to 'complete' the ``Future`` with it's reply. Blocking is discouraged though as it can cause performance problem. The blocking operations are located in ``Await.result`` and ``Await.ready`` to make it easy to spot where blocking occurs. Alternatives to blocking are discussed further within this documentation. -Also note that the ``Future`` returned by an ``UntypedActor`` is a ``Future`` since an ``UntypedActor`` is dynamic. +Also note that the ``Future`` returned by an ``AbstractActor`` is a ``Future`` since an ``AbstractActor`` is dynamic. That is why the cast to ``String`` is used in the above sample. .. warning:: @@ -64,7 +64,7 @@ Use Directly ------------ A common use case within Akka is to have some computation performed concurrently without needing -the extra utility of an ``UntypedActor``. If you find yourself creating a pool of ``UntypedActor``\s for the sole reason +the extra utility of an ``AbstractActor``. If you find yourself creating a pool of ``AbstractActor``\s for the sole reason of performing a calculation in parallel, there is an easier (and faster) way: .. includecode:: code/docs/future/FutureDocTest.java @@ -75,8 +75,8 @@ of performing a calculation in parallel, there is an easier (and faster) way: In the above code the block passed to ``future`` will be executed by the default ``Dispatcher``, with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld"). -Unlike a ``Future`` that is returned from an ``UntypedActor``, this ``Future`` is properly typed, -and we also avoid the overhead of managing an ``UntypedActor``. +Unlike a ``Future`` that is returned from an ``AbstractActor``, this ``Future`` is properly typed, +and we also avoid the overhead of managing an ``AbstractActor``. You can also create already completed Futures using the ``Futures`` class, which can be either successes: @@ -229,7 +229,7 @@ Exceptions ---------- Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. -It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught +It doesn't matter if an ``AbstractActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``Await.result`` will cause it to be thrown again so it can be handled properly. diff --git a/akka-docs/rst/java/logging.rst b/akka-docs/rst/java/logging.rst index fccc064e87..e318a1a632 100644 --- a/akka-docs/rst/java/logging.rst +++ b/akka-docs/rst/java/logging.rst @@ -402,11 +402,11 @@ MDC values defined by the application One useful feature available in Slf4j is `MDC `_, Akka has a way for let the application specify custom values, you just need to get a specialized :class:`LoggingAdapter`, the :class:`DiagnosticLoggingAdapter`. In order to -get it you will use the factory receiving an UntypedActor as logSource: +get it you will use the factory receiving an AbstractActor as logSource: .. code-block:: scala - // Within your UntypedActor + // Within your AbstractActor final DiagnosticLoggingAdapter log = Logging.getLogger(this); Once you have the logger, you just need to add the custom values before you log something. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 0ef980d2a5..0cae804d8f 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -85,15 +85,15 @@ actors may of course also process commands that do not change application state Akka persistence supports event sourcing with the ``AbstractPersistentActor`` abstract class. An actor that extends this class uses the ``persist`` method to persist and handle events. The behavior of an ``AbstractPersistentActor`` -is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. +is defined by implementing ``createReceiveRecover`` and ``createReceive``. This is demonstrated in the following example. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``. -The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` -and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example, +The persistent actor's ``createReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` +and ``SnapshotOffer`` messages. The persistent actor's ``createReceive`` method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling ``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument. @@ -125,8 +125,8 @@ It contains instructions on how to run the ``PersistentActorExample``. It's also possible to switch between different command handlers during normal processing and recovery with ``getContext().become()`` and ``getContext().unbecome()``. To get the actor into the same state after recovery you need to take special care to perform the same state transitions with ``become`` and - ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. - Note that when using ``become`` from ``receiveRecover`` it will still only use the ``receiveRecover`` + ``unbecome`` in the ``createReceiveRecover`` method as you would have done in the command handler. + Note that when using ``become`` from ``createReceiveRecover`` it will still only use the ``createReceiveRecover`` behavior when replaying the events. When replay is completed it will use the new behavior. Identifiers diff --git a/akka-docs/rst/java/typed-actors.rst b/akka-docs/rst/java/typed-actors.rst index 0b6622023c..c7c5433299 100644 --- a/akka-docs/rst/java/typed-actors.rst +++ b/akka-docs/rst/java/typed-actors.rst @@ -178,7 +178,7 @@ you can create child Typed Actors by invoking ``typedActorOf(..)`` on that. .. includecode:: code/docs/actor/TypedActorDocTest.java :include: typed-actor-hierarchy -You can also create a child Typed Actor in regular Akka Actors by giving the ``UntypedActorContext`` +You can also create a child Typed Actor in regular Akka Actors by giving the ``AbstractActor.ActorContext`` as an input parameter to TypedActor.get(…). Supervisor Strategy diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/Extra.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/Extra.java index 678532d07b..dd35e36bf1 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/Extra.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/Extra.java @@ -5,7 +5,7 @@ import java.util.Collections; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.metrics.AdaptiveLoadBalancingGroup; import akka.cluster.metrics.AdaptiveLoadBalancingPool; import akka.cluster.routing.ClusterRouterGroup; @@ -16,7 +16,7 @@ import akka.cluster.metrics.HeapMetricsSelector; import akka.cluster.metrics.SystemLoadAverageMetricsSelector; //not used, only for documentation -abstract class FactorialFrontend2 extends UntypedActor { +abstract class FactorialFrontend2 extends AbstractActor { //#router-lookup-in-code int totalInstances = 100; Iterable routeesPaths = Arrays.asList("/user/factorialBackend", ""); @@ -31,7 +31,7 @@ abstract class FactorialFrontend2 extends UntypedActor { } //not used, only for documentation -abstract class FactorialFrontend3 extends UntypedActor { +abstract class FactorialFrontend3 extends AbstractActor { //#router-deploy-in-code int totalInstances = 100; int maxInstancesPerNode = 3; diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialBackend.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialBackend.java index 028de5879e..53dccd8c8b 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialBackend.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialBackend.java @@ -1,38 +1,28 @@ package sample.cluster.factorial; import java.math.BigInteger; -import java.util.concurrent.Callable; import scala.concurrent.Future; -import akka.actor.UntypedActor; -import akka.dispatch.Mapper; +import akka.actor.AbstractActor; import static akka.dispatch.Futures.future; import static akka.pattern.Patterns.pipe; //#backend -public class FactorialBackend extends UntypedActor { +public class FactorialBackend extends AbstractActor { @Override - public void onReceive(Object message) { - if (message instanceof Integer) { - final Integer n = (Integer) message; - Future f = future(new Callable() { - public BigInteger call() { - return factorial(n); - } - }, getContext().dispatcher()); + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, n -> { + Future f = future(() -> factorial(n), + getContext().dispatcher()); - Future result = f.map( - new Mapper() { - public FactorialResult apply(BigInteger factorial) { - return new FactorialResult(n, factorial); - } - }, getContext().dispatcher()); + Future result = + f.map(factorial -> new FactorialResult(n, factorial), + getContext().dispatcher()); - pipe(result, getContext().dispatcher()).to(getSender()); - - } else { - unhandled(message); - } + pipe(result, getContext().dispatcher()).to(sender()); + }) + .build(); } BigInteger factorial(int n) { diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontend.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontend.java index b1848a964f..4ee81535a2 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontend.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontend.java @@ -4,13 +4,13 @@ import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.routing.FromConfig; //#frontend -public class FactorialFrontend extends UntypedActor { +public class FactorialFrontend extends AbstractActor { final int upToN; final boolean repeat; @@ -31,24 +31,22 @@ public class FactorialFrontend extends UntypedActor { } @Override - public void onReceive(Object message) { - if (message instanceof FactorialResult) { - FactorialResult result = (FactorialResult) message; - if (result.n == upToN) { - log.debug("{}! = {}", result.n, result.factorial); - if (repeat) - sendJobs(); - else - getContext().stop(getSelf()); - } - - } else if (message instanceof ReceiveTimeout) { - log.info("Timeout"); - sendJobs(); - - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(FactorialResult.class, result -> { + if (result.n == upToN) { + log.debug("{}! = {}", result.n, result.factorial); + if (repeat) + sendJobs(); + else + getContext().stop(self()); + } + }) + .match(ReceiveTimeout.class, x -> { + log.info("Timeout"); + sendJobs(); + }) + .build(); } void sendJobs() { diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/MetricsListener.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/MetricsListener.java index 0cc06b167e..d9d8c510fe 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/MetricsListener.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/MetricsListener.java @@ -1,7 +1,7 @@ package sample.cluster.factorial; //#metrics-listener -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.metrics.ClusterMetricsChanged; @@ -13,7 +13,7 @@ import akka.cluster.metrics.ClusterMetricsExtension; import akka.event.Logging; import akka.event.LoggingAdapter; -public class MetricsListener extends UntypedActor { +public class MetricsListener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); @@ -33,23 +33,21 @@ public class MetricsListener extends UntypedActor { extension.unsubscribe(getSelf()); } - @Override - public void onReceive(Object message) { - if (message instanceof ClusterMetricsChanged) { - ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message; - for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) { - if (nodeMetrics.address().equals(cluster.selfAddress())) { - logHeap(nodeMetrics); - logCpu(nodeMetrics); + public Receive createReceive() { + return receiveBuilder() + .match(ClusterMetricsChanged.class, clusterMetrics -> { + for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) { + if (nodeMetrics.address().equals(cluster.selfAddress())) { + logHeap(nodeMetrics); + logCpu(nodeMetrics); + } } - } - - } else if (message instanceof CurrentClusterState) { - // Ignore. - } else { - unhandled(message); - } + }) + .match(CurrentClusterState.class, message -> { + // Ignore. + }) + .build(); } void logHeap(NodeMetrics nodeMetrics) { diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java index 37b763d6d9..d6a861c708 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java @@ -1,6 +1,6 @@ package sample.cluster.simple; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberEvent; @@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember; import akka.event.Logging; import akka.event.LoggingAdapter; -public class SimpleClusterListener extends UntypedActor { +public class SimpleClusterListener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); @@ -18,7 +18,7 @@ public class SimpleClusterListener extends UntypedActor { @Override public void preStart() { //#subscribe - cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), + cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class); //#subscribe } @@ -26,29 +26,24 @@ public class SimpleClusterListener extends UntypedActor { //re-subscribe when restart @Override public void postStop() { - cluster.unsubscribe(getSelf()); + cluster.unsubscribe(self()); } @Override - public void onReceive(Object message) { - if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - log.info("Member is Up: {}", mUp.member()); - - } else if (message instanceof UnreachableMember) { - UnreachableMember mUnreachable = (UnreachableMember) message; - log.info("Member detected as unreachable: {}", mUnreachable.member()); - - } else if (message instanceof MemberRemoved) { - MemberRemoved mRemoved = (MemberRemoved) message; - log.info("Member is Removed: {}", mRemoved.member()); - - } else if (message instanceof MemberEvent) { - // ignore - - } else { - unhandled(message); - } - + public Receive createReceive() { + return receiveBuilder() + .match(MemberUp.class, mUp -> { + log.info("Member is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + log.info("Member detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + log.info("Member is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, mEvent -> { + // ignore + }) + .build(); } } diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java index da623b3976..b7d7872830 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java @@ -1,6 +1,6 @@ package sample.cluster.simple; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberEvent; @@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember; import akka.event.Logging; import akka.event.LoggingAdapter; -public class SimpleClusterListener2 extends UntypedActor { +public class SimpleClusterListener2 extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); @@ -18,40 +18,34 @@ public class SimpleClusterListener2 extends UntypedActor { @Override public void preStart() { //#subscribe - cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class); + cluster.subscribe(self(), MemberEvent.class, UnreachableMember.class); //#subscribe } //re-subscribe when restart @Override public void postStop() { - cluster.unsubscribe(getSelf()); + cluster.unsubscribe(self()); } @Override - public void onReceive(Object message) { - if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - log.info("Current members: {}", state.members()); - - } else if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - log.info("Member is Up: {}", mUp.member()); - - } else if (message instanceof UnreachableMember) { - UnreachableMember mUnreachable = (UnreachableMember) message; - log.info("Member detected as unreachable: {}", mUnreachable.member()); - - } else if (message instanceof MemberRemoved) { - MemberRemoved mRemoved = (MemberRemoved) message; - log.info("Member is Removed: {}", mRemoved.member()); - - } else if (message instanceof MemberEvent) { - // ignore - - } else { - unhandled(message); - } - + public Receive createReceive() { + return receiveBuilder() + .match(CurrentClusterState.class, state -> { + log.info("Current members: {}", state.members()); + }) + .match(MemberUp.class, mUp -> { + log.info("Member is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + log.info("Member detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + log.info("Member is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, mEvent -> { + // ignore + }) + .build(); } } diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/Extra.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/Extra.java index a7de074c2d..7f92c3218e 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/Extra.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/Extra.java @@ -4,7 +4,7 @@ import java.util.Collections; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.routing.ClusterRouterGroup; import akka.cluster.routing.ClusterRouterGroupSettings; import akka.cluster.routing.ClusterRouterPool; @@ -13,7 +13,7 @@ import akka.routing.ConsistentHashingGroup; import akka.routing.ConsistentHashingPool; //not used, only for documentation -abstract class StatsService2 extends UntypedActor { +abstract class StatsService2 extends AbstractActor { //#router-lookup-in-code int totalInstances = 100; Iterable routeesPaths = Collections @@ -28,7 +28,7 @@ abstract class StatsService2 extends UntypedActor { } //not used, only for documentation -abstract class StatsService3 extends UntypedActor { +abstract class StatsService3 extends AbstractActor { //#router-deploy-in-code int totalInstances = 100; int maxInstancesPerNode = 3; diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsAggregator.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsAggregator.java index f4fb49703b..10fa307bb2 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsAggregator.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsAggregator.java @@ -9,10 +9,10 @@ import sample.cluster.stats.StatsMessages.StatsResult; import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#aggregator -public class StatsAggregator extends UntypedActor { +public class StatsAggregator extends AbstractActor { final int expectedResults; final ActorRef replyTo; @@ -29,27 +29,25 @@ public class StatsAggregator extends UntypedActor { } @Override - public void onReceive(Object message) { - if (message instanceof Integer) { - Integer wordCount = (Integer) message; - results.add(wordCount); - if (results.size() == expectedResults) { - int sum = 0; - for (int c : results) - sum += c; - double meanWordLength = ((double) sum) / results.size(); - replyTo.tell(new StatsResult(meanWordLength), getSelf()); - getContext().stop(getSelf()); - } - - } else if (message == ReceiveTimeout.getInstance()) { - replyTo.tell(new JobFailed("Service unavailable, try again later"), - getSelf()); - getContext().stop(getSelf()); - - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, wordCount -> { + results.add(wordCount); + if (results.size() == expectedResults) { + int sum = 0; + for (int c : results) + sum += c; + double meanWordLength = ((double) sum) / results.size(); + replyTo.tell(new StatsResult(meanWordLength), self()); + getContext().stop(self()); + } + }) + .match(ReceiveTimeout.class, timeout -> { + replyTo.tell(new JobFailed("Service unavailable, try again later"), + self()); + getContext().stop(self()); + }) + .build(); } } diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsService.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsService.java index a20be6ade3..310e20766a 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsService.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsService.java @@ -3,12 +3,12 @@ package sample.cluster.stats; import sample.cluster.stats.StatsMessages.StatsJob; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; import akka.routing.FromConfig; //#service -public class StatsService extends UntypedActor { +public class StatsService extends AbstractActor { // This router is used both with lookup and deploy of routees. If you // have a router with only lookup of routees you can use Props.empty() @@ -18,29 +18,23 @@ public class StatsService extends UntypedActor { "workerRouter"); @Override - public void onReceive(Object message) { - if (message instanceof StatsJob) { - StatsJob job = (StatsJob) message; - if (job.getText().equals("")) { - unhandled(message); - } else { + public Receive createReceive() { + return receiveBuilder() + .match(StatsJob.class, job -> !"".equals(job.getText()), job -> { final String[] words = job.getText().split(" "); - final ActorRef replyTo = getSender(); + final ActorRef replyTo = sender(); // create actor that collects replies from workers ActorRef aggregator = getContext().actorOf( - Props.create(StatsAggregator.class, words.length, replyTo)); + Props.create(StatsAggregator.class, words.length, replyTo)); // send each word to a worker for (String word : words) { workerRouter.tell(new ConsistentHashableEnvelope(word, word), - aggregator); + aggregator); } - } - - } else { - unhandled(message); - } + }) + .build(); } } diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsWorker.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsWorker.java index 2d0c97cd19..66b0fe76af 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsWorker.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsWorker.java @@ -3,28 +3,25 @@ package sample.cluster.stats; import java.util.HashMap; import java.util.Map; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#worker -public class StatsWorker extends UntypedActor { +public class StatsWorker extends AbstractActor { Map cache = new HashMap(); @Override - public void onReceive(Object message) { - if (message instanceof String) { - String word = (String) message; - Integer length = cache.get(word); - if (length == null) { - length = word.length(); - cache.put(word, length); - } - getSender().tell(length, getSelf()); - - } else { - unhandled(message); - } + public Receive createReceive() { + return receiveBuilder() + .match(String.class, word -> { + Integer length = cache.get(word); + if (length == null) { + length = word.length(); + cache.put(word, length); + } + sender().tell(length, self()); + }) + .build(); } - } //#worker \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationBackend.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationBackend.java index 2bca33d71a..219b3f6a5c 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationBackend.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationBackend.java @@ -3,7 +3,7 @@ package sample.cluster.transformation; import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION; import sample.cluster.transformation.TransformationMessages.TransformationJob; import sample.cluster.transformation.TransformationMessages.TransformationResult; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberUp; @@ -11,44 +11,40 @@ import akka.cluster.Member; import akka.cluster.MemberStatus; //#backend -public class TransformationBackend extends UntypedActor { +public class TransformationBackend extends AbstractActor { Cluster cluster = Cluster.get(getContext().system()); //subscribe to cluster changes, MemberUp @Override public void preStart() { - cluster.subscribe(getSelf(), MemberUp.class); + cluster.subscribe(self(), MemberUp.class); } //re-subscribe when restart @Override public void postStop() { - cluster.unsubscribe(getSelf()); + cluster.unsubscribe(self()); } @Override - public void onReceive(Object message) { - if (message instanceof TransformationJob) { - TransformationJob job = (TransformationJob) message; - getSender().tell(new TransformationResult(job.getText().toUpperCase()), - getSelf()); - - } else if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - for (Member member : state.getMembers()) { - if (member.status().equals(MemberStatus.up())) { - register(member); + public Receive createReceive() { + return receiveBuilder() + .match(TransformationJob.class, job -> { + sender().tell(new TransformationResult(job.getText().toUpperCase()), + self()); + }) + .match(CurrentClusterState.class, state -> { + for (Member member : state.getMembers()) { + if (member.status().equals(MemberStatus.up())) { + register(member); + } } - } - - } else if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - register(mUp.member()); - - } else { - unhandled(message); - } + }) + .match(MemberUp.class, mUp -> { + register(mUp.member()); + }) + .build(); } void register(Member member) { diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationFrontend.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationFrontend.java index 4fab1cd559..d7b434cdc1 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationFrontend.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/transformation/TransformationFrontend.java @@ -9,40 +9,35 @@ import sample.cluster.transformation.TransformationMessages.JobFailed; import sample.cluster.transformation.TransformationMessages.TransformationJob; import akka.actor.ActorRef; import akka.actor.Terminated; -import akka.actor.UntypedActor; +import akka.actor.AbstractActor; //#frontend -public class TransformationFrontend extends UntypedActor { +public class TransformationFrontend extends AbstractActor { List backends = new ArrayList(); int jobCounter = 0; @Override - public void onReceive(Object message) { - if ((message instanceof TransformationJob) && backends.isEmpty()) { - TransformationJob job = (TransformationJob) message; - getSender().tell( + public Receive createReceive() { + return receiveBuilder() + .match(TransformationJob.class, job -> backends.isEmpty(), job -> { + sender().tell( new JobFailed("Service unavailable, try again later", job), - getSender()); - - } else if (message instanceof TransformationJob) { - TransformationJob job = (TransformationJob) message; - jobCounter++; - backends.get(jobCounter % backends.size()) + sender()); + }) + .match(TransformationJob.class, job -> { + jobCounter++; + backends.get(jobCounter % backends.size()) .forward(job, getContext()); - - } else if (message.equals(BACKEND_REGISTRATION)) { - getContext().watch(getSender()); - backends.add(getSender()); - - } else if (message instanceof Terminated) { - Terminated terminated = (Terminated) message; - backends.remove(terminated.getActor()); - - } else { - unhandled(message); - } + }) + .matchEquals(BACKEND_REGISTRATION, m -> { + getContext().watch(sender()); + backends.add(sender()); + }) + .match(Terminated.class, terminated -> { + backends.remove(terminated.getActor()); + }) + .build(); } - } //#frontend diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java index 0f39e45da0..6d469a8cce 100644 --- a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java +++ b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java @@ -135,32 +135,32 @@ public class ShoppingCart extends AbstractActor { public ShoppingCart(String userId) { this.userId = userId; this.dataKey = LWWMapKey.create("cart-" + userId); - - receive(matchGetCart() - .orElse(matchAddItem()) - .orElse(matchRemoveItem()) - .orElse(matchOther())); } + @Override + public Receive createReceive() { + return matchGetCart() + .orElse(matchAddItem()) + .orElse(matchRemoveItem()) + .orElse(matchOther()); + } //#get-cart - private PartialFunction matchGetCart() { - return ReceiveBuilder - .matchEquals((GET_CART), - s -> receiveGetCart()) - .match(GetSuccess.class, g -> isResponseToGetCart(g), - g -> receiveGetSuccess((GetSuccess>) g)) - .match(NotFound.class, n -> isResponseToGetCart(n), - n -> receiveNotFound((NotFound>) n)) - .match(GetFailure.class, f -> isResponseToGetCart(f), - f -> receiveGetFailure((GetFailure>) f)) + private Receive matchGetCart() { + return receiveBuilder() + .matchEquals(GET_CART, s -> receiveGetCart()) + .match(GetSuccess.class, this::isResponseToGetCart, + g -> receiveGetSuccess((GetSuccess>) g)) + .match(NotFound.class, this::isResponseToGetCart, + n -> receiveNotFound((NotFound>) n)) + .match(GetFailure.class, this::isResponseToGetCart, + f -> receiveGetFailure((GetFailure>) f)) .build(); } - private void receiveGetCart() { Optional ctx = Optional.of(sender()); - replicator.tell(new Replicator.Get>(dataKey, readMajority, ctx), + replicator.tell(new Replicator.Get>(dataKey, readMajority, ctx), self()); } @@ -189,9 +189,9 @@ public class ShoppingCart extends AbstractActor { //#get-cart //#add-item - private PartialFunction matchAddItem() { - return ReceiveBuilder - .match(AddItem.class, r -> receiveAddItem(r)) + private Receive matchAddItem() { + return receiveBuilder() + .match(AddItem.class, this::receiveAddItem) .build(); } @@ -214,14 +214,14 @@ public class ShoppingCart extends AbstractActor { } } - private PartialFunction matchRemoveItem() { - return ReceiveBuilder - .match(RemoveItem.class, r -> receiveRemoveItem(r)) - .match(GetSuccess.class, g -> isResponseToRemoveItem(g), - g -> receiveRemoveItemGetSuccess((GetSuccess>) g)) - .match(GetFailure.class, f -> isResponseToRemoveItem(f), - f -> receiveRemoveItemGetFailure((GetFailure>) f)) - .match(NotFound.class, n -> isResponseToRemoveItem(n), n -> {/* nothing to remove */}) + private Receive matchRemoveItem() { + return receiveBuilder() + .match(RemoveItem.class, this::receiveRemoveItem) + .match(GetSuccess.class, this::isResponseToRemoveItem, + g -> receiveRemoveItemGetSuccess((GetSuccess>) g)) + .match(GetFailure.class, this::isResponseToRemoveItem, + f -> receiveRemoveItemGetFailure((GetFailure>) f)) + .match(NotFound.class, this::isResponseToRemoveItem, n -> {/* nothing to remove */}) .build(); } @@ -258,8 +258,8 @@ public class ShoppingCart extends AbstractActor { } //#remove-item - private PartialFunction matchOther() { - return ReceiveBuilder + private Receive matchOther() { + return receiveBuilder() .match(UpdateSuccess.class, u -> { // ok }) @@ -272,6 +272,4 @@ public class ShoppingCart extends AbstractActor { .build(); } - - } \ No newline at end of file diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java index 9991903897..493e73d549 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java @@ -9,11 +9,8 @@ package sample.persistence; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.japi.pf.ReceiveBuilder; import akka.persistence.AbstractPersistentActor; import akka.persistence.SnapshotOffer; -import scala.PartialFunction; -import scala.runtime.BoxedUnit; import java.io.Serializable; import java.util.ArrayList; @@ -88,31 +85,31 @@ class ExamplePersistentActor extends AbstractPersistentActor { public String persistenceId() { return "sample-id-1"; } @Override - public PartialFunction receiveRecover() { - return ReceiveBuilder. - match(Evt.class, state::update). - match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build(); + public Receive createReceiveRecover() { + return receiveBuilder() + .match(Evt.class, state::update) + .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()) + .build(); } @Override - public PartialFunction receiveCommand() { - return ReceiveBuilder. - match(Cmd.class, c -> { - final String data = c.getData(); - final Evt evt1 = new Evt(data + "-" + getNumEvents()); - final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1)); - persistAll(asList(evt1, evt2), (Evt evt) -> { - state.update(evt); - if (evt.equals(evt2)) { - context().system().eventStream().publish(evt); - } - }); - }). - match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())). - match(String.class, s -> s.equals("print"), s -> System.out.println(state)). - build(); + public Receive createReceive() { + return receiveBuilder() + .match(Cmd.class, c -> { + final String data = c.getData(); + final Evt evt1 = new Evt(data + "-" + getNumEvents()); + final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1)); + persistAll(asList(evt1, evt2), (Evt evt) -> { + state.update(evt); + if (evt.equals(evt2)) { + getContext().system().eventStream().publish(evt); + } + }); + }) + .matchEquals("snap", s -> saveSnapshot(state.copy())) + .matchEquals("print", s -> System.out.println(state)) + .build(); } - } //#persistent-actor-example