Convert remaining UntypedActor in docs #22182

This commit is contained in:
ortigali 2017-02-04 11:51:30 +05:00
parent 432b53c509
commit 760de5c6d4
39 changed files with 701 additions and 717 deletions

View file

@ -121,7 +121,7 @@ public class Match<I, R> extends AbstractMatch<I, R> {
* <p></p>
*
* <pre><code>
* Matcher&lt;X, Y&gt; matcher = Matcher.create(...);
* Match&lt;X, Y&gt; matcher = Match.create(...);
*
* Y someY = matcher.match(obj);
* </code></pre>

View file

@ -542,7 +542,7 @@ trait Actor {
* Actors are automatically started asynchronously when created.
* Empty default implementation.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
@throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
//#lifecycle-hooks
def preStart(): Unit = ()
@ -554,7 +554,7 @@ trait Actor {
* Is called asynchronously after 'actor.stop()' is invoked.
* Empty default implementation.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
@throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
//#lifecycle-hooks
def postStop(): Unit = ()
@ -568,7 +568,7 @@ trait Actor {
* Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
@throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
//#lifecycle-hooks
def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child
@ -586,7 +586,7 @@ trait Actor {
* <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
@throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
//#lifecycle-hooks
def postRestart(reason: Throwable): Unit = {
preStart()

View file

@ -166,7 +166,7 @@ trait ActorContext extends ActorRefFactory {
* UntypedActorContext is the UntypedActor equivalent of ActorContext,
* containing the Java API
*/
@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0")
@deprecated("Use AbstractActor.ActorContext instead of UntypedActorContext.", since = "2.5.0")
trait UntypedActorContext extends ActorContext {
/**

View file

@ -44,14 +44,14 @@ package akka.actor
* There is also an unrestricted version [[akka.actor.UntypedActorWithUnrestrictedStash]] that does not
* enforce the mailbox type.
*/
@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0")
@deprecated("Use AbstractActorWithStash instead of UntypedActorWithStash.", since = "2.5.0")
abstract class UntypedActorWithStash extends UntypedActor with Stash
/**
* Actor base class with `Stash` that enforces an unbounded deque for the actor.
* See [[akka.actor.UntypedActorWithStash]] for details on how `Stash` works.
*/
@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0")
@deprecated("Use AbstractActorWithUnboundedStash instead of UntypedActorWithUnboundedStash.", since = "2.5.0")
abstract class UntypedActorWithUnboundedStash extends UntypedActor with UnboundedStash
/**
@ -59,5 +59,5 @@ abstract class UntypedActorWithUnboundedStash extends UntypedActor with Unbounde
* manually, and the mailbox should extend the [[akka.dispatch.DequeBasedMessageQueueSemantics]] marker trait.
* See [[akka.actor.UntypedActorWithStash]] for details on how `Stash` works.
*/
@deprecated("Use AbstractActor instead of UntypedActor.", since = "2.5.0")
@deprecated("Use AbstractActorWithUnrestrictedStash instead of UntypedActorWithUnrestrictedStash.", since = "2.5.0")
abstract class UntypedActorWithUnrestrictedStash extends UntypedActor with UnrestrictedStash

View file

@ -31,7 +31,7 @@ public class ReliableProxyTest extends JUnitSuite {
private final ActorSystem system = actorSystemResource.getSystem();
static//#demo-proxy
public class ProxyParent extends UntypedActor {
public class ProxyParent extends AbstractActor {
private final ActorRef proxy;
public ProxyParent(ActorPath targetPath) {
@ -40,17 +40,20 @@ public class ReliableProxyTest extends JUnitSuite {
Duration.create(100, TimeUnit.MILLISECONDS)));
}
public void onReceive(Object msg) {
if ("hello".equals(msg)) {
proxy.tell("world!", getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("hello", m -> {
proxy.tell("world!", self());
})
.build();
}
}
//#demo-proxy
static//#demo-transition
public class ProxyTransitionParent extends UntypedActor {
public class ProxyTransitionParent extends AbstractActor {
private final ActorRef proxy;
private ActorRef client = null;
@ -61,22 +64,24 @@ public class ReliableProxyTest extends JUnitSuite {
proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf());
}
public void onReceive(Object msg) {
if ("hello".equals(msg)) {
proxy.tell("world!", getSelf());
client = getSender();
} else if (msg instanceof FSM.CurrentState<?>) {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("hello", message -> {
proxy.tell("world!", self());
client = sender();
})
.matchUnchecked(FSM.CurrentState.class, (FSM.CurrentState<ReliableProxy.State> state) -> {
// get initial state
} else if (msg instanceof FSM.Transition<?>) {
@SuppressWarnings("unchecked")
final FSM.Transition<ReliableProxy.State> transition =
(FSM.Transition<ReliableProxy.State>) msg;
})
.matchUnchecked(FSM.Transition.class, (FSM.Transition<ReliableProxy.State> transition) -> {
assert transition.fsmRef().equals(proxy);
if (transition.from().equals(ReliableProxy.active()) &&
transition.to().equals(ReliableProxy.idle())) {
client.tell("done", getSelf());
}
client.tell("done", self());
}
})
.build();
}
}

View file

@ -15,8 +15,7 @@ import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.contrib.throttle.TimerBasedThrottler;
import akka.actor.AbstractActor;
import akka.testkit.AkkaJUnitActorSystemResource;
public class TimerBasedThrottlerTest extends JUnitSuite {
@ -51,10 +50,14 @@ public class TimerBasedThrottlerTest extends JUnitSuite {
static//#demo-code
//A simple actor that prints whatever it receives
public class Printer extends UntypedActor {
public class Printer extends AbstractActor {
@Override
public void onReceive(Object msg) {
System.out.println(msg);
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> {
System.out.println(message);
})
.build();
}
}

View file

@ -2,7 +2,7 @@ package docs.io;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.io.Inet;
import akka.io.Tcp;
import akka.io.TcpMessage;
@ -17,23 +17,26 @@ import java.util.List;
*/
public class JavaReadBackPressure {
static public class Listener extends UntypedActor {
static public class Listener extends AbstractActor {
ActorRef tcp;
ActorRef listener;
@Override
//#pull-accepting
public void onReceive(Object message) throws Exception {
if (message instanceof Tcp.Bound) {
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Bound.class, x -> {
listener = sender();
// Accept connections one by one
listener.tell(TcpMessage.resumeAccepting(1), self());
} else if (message instanceof Tcp.Connected) {
})
.match(Tcp.Connected.class, x -> {
ActorRef handler = getContext().actorOf(Props.create(PullEcho.class, sender()));
sender().tell(TcpMessage.register(handler), self());
// Resume accepting connections
listener.tell(TcpMessage.resumeAccepting(1), self());
}
})
.build();
}
//#pull-accepting
@ -63,7 +66,7 @@ public class JavaReadBackPressure {
static public class Ack implements Tcp.Event {
}
static public class PullEcho extends UntypedActor {
static public class PullEcho extends AbstractActor {
final ActorRef connection;
public PullEcho(ActorRef connection) {
@ -77,17 +80,18 @@ public class JavaReadBackPressure {
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Tcp.Received) {
ByteString data = ((Tcp.Received) message).data();
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Received.class, message -> {
ByteString data = message.data();
connection.tell(TcpMessage.write(data, new Ack()), self());
} else if (message instanceof Ack) {
})
.match(Ack.class, message -> {
connection.tell(TcpMessage.resumeReading(), self());
}
})
.build();
}
//#pull-reading-echo
}
}

View file

@ -6,7 +6,7 @@ package docs.io;
//#imports
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.io.Inet;
@ -57,7 +57,7 @@ public class JavaUdpMulticast {
}
//#multicast-group
public static class Listener extends UntypedActor {
public static class Listener extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
ActorRef sink;
@ -78,21 +78,22 @@ public class JavaUdpMulticast {
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.Bound) {
final Udp.Bound b = (Udp.Bound) msg;
log.info("Bound to {}", b.localAddress());
sink.tell(b, self());
} else if (msg instanceof Udp.Received) {
final Udp.Received r = (Udp.Received) msg;
final String txt = r.data().decodeString("utf-8");
log.info("Received '{}' from {}", txt, r.sender());
public Receive createReceive() {
return receiveBuilder()
.match(Udp.Bound.class, bound -> {
log.info("Bound to {}", bound.localAddress());
sink.tell(bound, self());
})
.match(Udp.Received.class, received -> {
final String txt = received.data().decodeString("utf-8");
log.info("Received '{}' from {}", txt, received.sender());
sink.tell(txt, self());
} else unhandled(msg);
})
.build();
}
}
public static class Sender extends UntypedActor {
public static class Sender extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
String iface;
@ -114,12 +115,14 @@ public class JavaUdpMulticast {
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.SimpleSenderReady) {
public Receive createReceive() {
return receiveBuilder()
.match(Udp.SimpleSenderReady.class, x -> {
InetSocketAddress remote = new InetSocketAddress(group + "%" + iface, port);
log.info("Sending message to " + remote);
sender().tell(UdpMessage.send(ByteString.fromString(message), remote), self());
} else unhandled(msg);
})
.build();
}
}
}

View file

@ -4,12 +4,11 @@
package docs.io;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import akka.japi.pf.ReceiveBuilder;
import org.junit.Test;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#imports
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -24,14 +23,15 @@ import akka.util.ByteString;
public class UdpConnectedDocTest {
static public class Demo extends UntypedActor {
static public class Demo extends AbstractActor {
ActorRef connectionActor = null;
ActorRef handler = self();
ActorSystem system = getContext().system();
@Override
public void onReceive(Object msg) {
if ("connect".equals(msg)) {
public Receive createReceive() {
ReceiveBuilder builder = receiveBuilder();
builder.matchEquals("connect", message -> {
//#manager
final ActorRef udp = UdpConnected.get(system).manager();
//#manager
@ -48,34 +48,33 @@ public class UdpConnectedDocTest {
options.add(UdpSO.broadcast(true));
udp.tell(UdpConnectedMessage.connect(handler, remoteAddr, localAddr, options), self());
//#connect-with-options
} else
});
//#connected
if (msg instanceof UdpConnected.Connected) {
final UdpConnected.Connected conn = (UdpConnected.Connected) msg;
builder.match(UdpConnected.Connected.class, conn -> {
connectionActor = sender(); // Save the worker ref for later use
}
});
//#connected
else
//#received
if (msg instanceof UdpConnected.Received) {
final UdpConnected.Received recv = (UdpConnected.Received) msg;
builder
.match(UdpConnected.Received.class, recv -> {
final ByteString data = recv.data();
// and do something with the received data ...
} else if (msg instanceof UdpConnected.CommandFailed) {
final UdpConnected.CommandFailed failed = (UdpConnected.CommandFailed) msg;
})
.match(UdpConnected.CommandFailed.class, failed -> {
final UdpConnected.Command command = failed.cmd();
// react to failed connect, etc.
} else if (msg instanceof UdpConnected.Disconnected) {
})
.match(UdpConnected.Disconnected.class, x -> {
// do something on disconnect
}
});
//#received
else
if ("send".equals(msg)) {
builder.matchEquals("send", x -> {
ByteString data = ByteString.empty();
//#send
connectionActor.tell(UdpConnectedMessage.send(data), self());
//#send
}
});
return builder.build();
}
}

View file

@ -7,7 +7,7 @@ package docs.io;
//#imports
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.io.Udp;
import akka.io.UdpConnected;
import akka.io.UdpConnectedMessage;
@ -21,7 +21,7 @@ import java.net.InetSocketAddress;
public class UdpDocTest {
//#sender
public static class SimpleSender extends UntypedActor {
public static class SimpleSender extends AbstractActor {
final InetSocketAddress remote;
public SimpleSender(InetSocketAddress remote) {
@ -33,37 +33,34 @@ public class UdpDocTest {
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.SimpleSenderReady) {
public Receive createReceive() {
return receiveBuilder()
.match(Udp.SimpleSenderReady.class, message -> {
getContext().become(ready(sender()));
//#sender
sender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), self());
//#sender
} else unhandled(msg);
})
.build();
}
private Procedure<Object> ready(final ActorRef send) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof String) {
final String str = (String) msg;
send.tell(UdpMessage.send(ByteString.fromString(str), remote), self());
private Receive ready(final ActorRef send) {
return receiveBuilder()
.match(String.class, message -> {
send.tell(UdpMessage.send(ByteString.fromString(message), remote), self());
//#sender
if (str.equals("world")) {
if (message.equals("world")) {
send.tell(PoisonPill.getInstance(), self());
}
//#sender
} else unhandled(msg);
}
};
})
.build();
}
}
//#sender
//#listener
public static class Listener extends UntypedActor {
public static class Listener extends AbstractActor {
final ActorRef nextActor;
public Listener(ActorRef nextActor) {
@ -77,46 +74,42 @@ public class UdpDocTest {
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.Bound) {
final Udp.Bound b = (Udp.Bound) msg;
public Receive createReceive() {
return receiveBuilder()
.match(Udp.Bound.class, bound -> {
//#listener
nextActor.tell(b.localAddress(), sender());
nextActor.tell(bound.localAddress(), sender());
//#listener
getContext().become(ready(sender()));
} else unhandled(msg);
})
.build();
}
private Procedure<Object> ready(final ActorRef socket) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof Udp.Received) {
final Udp.Received r = (Udp.Received) msg;
private Receive ready(final ActorRef socket) {
return receiveBuilder()
.match(Udp.Received.class, r -> {
// echo server example: send back the data
socket.tell(UdpMessage.send(r.data(), r.sender()), self());
// or do some processing and forward it on
final Object processed = // parse data etc., e.g. using PipelineStage
//#listener
// #listener
r.data().utf8String();
//#listener
nextActor.tell(processed, self());
} else if (msg.equals(UdpMessage.unbind())) {
socket.tell(msg, self());
} else if (msg instanceof Udp.Unbound) {
})
.matchEquals(UdpMessage.unbind(), message -> {
socket.tell(message, self());
})
.match(Udp.Unbound.class, message -> {
getContext().stop(self());
} else unhandled(msg);
}
};
})
.build();
}
}
//#listener
//#connected
public static class Connected extends UntypedActor {
public static class Connected extends AbstractActor {
final InetSocketAddress remote;
public Connected(InetSocketAddress remote) {
@ -128,23 +121,22 @@ public class UdpDocTest {
}
@Override
public void onReceive(Object msg) {
if (msg instanceof UdpConnected.Connected) {
public Receive createReceive() {
return receiveBuilder()
.match(UdpConnected.Connected.class, message -> {
getContext().become(ready(sender()));
//#connected
sender()
.tell(UdpConnectedMessage.send(ByteString.fromString("hello")),
self());
//#connected
} else unhandled(msg);
})
.build();
}
private Procedure<Object> ready(final ActorRef connection) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof UdpConnected.Received) {
final UdpConnected.Received r = (UdpConnected.Received) msg;
private Receive ready(final ActorRef connection) {
return receiveBuilder()
.match(UdpConnected.Received.class, r -> {
// process data, send it on, etc.
// #connected
if (r.data().utf8String().equals("hello")) {
@ -153,22 +145,19 @@ public class UdpDocTest {
self());
}
// #connected
} else if (msg instanceof String) {
final String str = (String) msg;
})
.match(String.class, str -> {
connection
.tell(UdpConnectedMessage.send(ByteString.fromString(str)),
self());
} else if (msg.equals(UdpConnectedMessage.disconnect())) {
connection.tell(msg, self());
} else if (msg instanceof UdpConnected.Disconnected) {
})
.matchEquals(UdpConnectedMessage.disconnect(), message -> {
connection.tell(message, self());
})
.match(UdpConnected.Disconnected.class, x -> {
getContext().stop(self());
} else unhandled(msg);
}
};
})
.build();
}
}
//#connected

View file

@ -13,7 +13,7 @@ import akka.testkit.JavaTestKit;
import akka.actor.ActorSystem;
//#imports1
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.routing.ConsistentHashingRouter.ConsistentHashable;
import java.util.Map;
@ -41,24 +41,24 @@ public class ConsistentHashingRouterDocTest extends AbstractJavaTest {
static
//#cache-actor
public class Cache extends UntypedActor {
public class Cache extends AbstractActor {
Map<String, String> cache = new HashMap<String, String>();
public void onReceive(Object msg) {
if (msg instanceof Entry) {
Entry entry = (Entry) msg;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Entry.class, entry -> {
cache.put(entry.key, entry.value);
} else if (msg instanceof Get) {
Get get = (Get) msg;
})
.match(Get.class, get -> {
Object value = cache.get(get.key);
sender().tell(value == null ? NOT_FOUND : value,
getContext().self());
} else if (msg instanceof Evict) {
Evict evict = (Evict) msg;
})
.match(Evict.class, evict -> {
cache.remove(evict.key);
} else {
unhandled(msg);
}
})
.build();
}
}

View file

@ -28,7 +28,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
//#imports1
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import java.io.Serializable;
import java.util.ArrayList;
@ -94,9 +94,14 @@ public class CustomRouterDocTest extends AbstractJavaTest {
//#unit-test-logic
static public class Storage extends UntypedActor {
public void onReceive(Object msg) {
sender().tell(msg, self());
static public class Storage extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> {
sender().tell(message, self());
})
.build();
}
}

View file

@ -30,7 +30,7 @@ import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.routing.ActorRefRoutee;
import akka.routing.Routee;
import akka.routing.Router;
@ -90,7 +90,7 @@ public class RouterDocTest extends AbstractJavaTest {
//#router-in-actor
static
//#router-in-actor
public class Master extends UntypedActor {
public class Master extends AbstractActor {
Router router;
{
@ -103,32 +103,45 @@ public class RouterDocTest extends AbstractJavaTest {
router = new Router(new RoundRobinRoutingLogic(), routees);
}
public void onReceive(Object msg) {
if (msg instanceof Work) {
router.route(msg, sender());
} else if (msg instanceof Terminated) {
router = router.removeRoutee(((Terminated) msg).actor());
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, message -> {
router.route(message, sender());
})
.match(Terminated.class, message -> {
router = router.removeRoutee(message.actor());
ActorRef r = getContext().actorOf(Props.create(Worker.class));
getContext().watch(r);
router = router.addRoutee(new ActorRefRoutee(r));
}
})
.build();
}
}
//#router-in-actor
static public class Worker extends UntypedActor {
public void onReceive(Object msg) {}
}
static public class Echo extends UntypedActor {
public void onReceive(Object msg) {
sender().tell(msg, self());
static public class Worker extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
static public class Replier extends UntypedActor {
public void onReceive(Object msg) {
static public class Echo extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> sender().tell(message, self()))
.build();
}
}
static public class Replier extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> {
//#reply-with-self
sender().tell("reply", self());
//#reply-with-self
@ -136,6 +149,8 @@ public class RouterDocTest extends AbstractJavaTest {
//#reply-with-parent
sender().tell("reply", getContext().parent());
//#reply-with-parent
})
.build();
}
}
@ -143,7 +158,7 @@ public class RouterDocTest extends AbstractJavaTest {
static
//#create-worker-actors
public class Workers extends UntypedActor {
public class Workers extends AbstractActor {
@Override public void preStart() {
getContext().actorOf(Props.create(Worker.class), "w1");
getContext().actorOf(Props.create(Worker.class), "w2");
@ -152,11 +167,13 @@ public class RouterDocTest extends AbstractJavaTest {
// ...
//#create-worker-actors
public void onReceive(Object msg) {}
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
static public class Parent extends UntypedActor {
static public class Parent extends AbstractActor {
//#paths
List<String> paths = Arrays.asList("/user/workers/w1", "/user/workers/w2",
@ -346,10 +363,11 @@ public class RouterDocTest extends AbstractJavaTest {
Props.create(Worker.class)), "router31");
//#optimal-size-exploring-resize-pool
public void onReceive(Object msg) {}
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
@Test
public void createActors() {

View file

@ -25,7 +25,7 @@ public class SchedulerPatternTest extends AbstractJavaTest {
static
//#schedule-constructor
public class ScheduleInConstructor extends UntypedActor {
public class ScheduleInConstructor extends AbstractActor {
private final Cancellable tick = getContext().system().scheduler().schedule(
Duration.create(500, TimeUnit.MILLISECONDS),
@ -45,28 +45,25 @@ public class SchedulerPatternTest extends AbstractJavaTest {
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("tick")) {
public Receive createReceive() {
return receiveBuilder()
.matchEquals("tick", message -> {
// do something useful here
//#schedule-constructor
target.tell(message, self());
//#schedule-constructor
}
//#schedule-constructor
else if (message.equals("restart")) {
})
.matchEquals("restart", message -> {
throw new ArithmeticException();
}
//#schedule-constructor
else {
unhandled(message);
}
})
.build();
}
}
//#schedule-constructor
static
//#schedule-receive
public class ScheduleInReceive extends UntypedActor {
public class ScheduleInReceive extends AbstractActor {
//#schedule-receive
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
@ -88,8 +85,9 @@ public class SchedulerPatternTest extends AbstractJavaTest {
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("tick")) {
public Receive createReceive() {
return receiveBuilder()
.matchEquals("tick", message -> {
// send another periodic tick after the specified delay
getContext().system().scheduler().scheduleOnce(
Duration.create(1, TimeUnit.SECONDS),
@ -98,15 +96,11 @@ public class SchedulerPatternTest extends AbstractJavaTest {
//#schedule-receive
target.tell(message, self());
//#schedule-receive
}
//#schedule-receive
else if (message.equals("restart")) {
})
.matchEquals("restart", message -> {
throw new ArithmeticException();
}
//#schedule-receive
else {
unhandled(message);
}
})
.build();
}
}
//#schedule-receive

View file

@ -17,7 +17,7 @@ import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
@ -39,21 +39,21 @@ public class SupervisedAsk {
private static class AskTimeout {
}
public static class AskSupervisorCreator extends UntypedActor {
public static class AskSupervisorCreator extends AbstractActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof AskParam) {
public Receive createReceive() {
return receiveBuilder()
.match(AskParam.class, message -> {
ActorRef supervisor = getContext().actorOf(
Props.create(AskSupervisor.class));
supervisor.forward(message, getContext());
} else {
unhandled(message);
}
})
.build();
}
}
public static class AskSupervisor extends UntypedActor {
public static class AskSupervisor extends AbstractActor {
private ActorRef targetActor;
private ActorRef caller;
private AskParam askParam;
@ -71,9 +71,10 @@ public class SupervisedAsk {
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof AskParam) {
askParam = (AskParam) message;
public Receive createReceive() {
return receiveBuilder()
.match(AskParam.class, message -> {
askParam = message;
caller = sender();
targetActor = getContext().actorOf(askParam.props);
getContext().watch(targetActor);
@ -81,18 +82,20 @@ public class SupervisedAsk {
Scheduler scheduler = getContext().system().scheduler();
timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(),
self(), new AskTimeout(), getContext().dispatcher(), null);
} else if (message instanceof Terminated) {
})
.match(Terminated.class, message -> {
Throwable ex = new ActorKilledException("Target actor terminated.");
caller.tell(new Status.Failure(ex), self());
timeoutMessage.cancel();
getContext().stop(self());
} else if (message instanceof AskTimeout) {
})
.match(AskTimeout.class, message -> {
Throwable ex = new TimeoutException("Target actor timed out after "
+ askParam.timeout.toString());
caller.tell(new Status.Failure(ex), self());
getContext().stop(self());
} else
unhandled(message);
})
.build();
}
}

View file

@ -5,12 +5,12 @@ import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.util.Timeout;
public class SupervisedAskSpec {
public Object execute(Class<? extends UntypedActor> someActor,
public Object execute(Class<? extends AbstractActor> someActor,
Object message, Timeout timeout, ActorRefFactory actorSystem)
throws Exception {
// example usage

View file

@ -19,13 +19,16 @@ import akka.actor.ActorSystem;
import akka.remote.RemoteScope;
//#import
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
public class RemoteDeploymentDocTest {
public static class SampleActor extends UntypedActor {
public void onReceive(Object message) {
sender().tell(self(), self());
public static class SampleActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> sender().tell(self(), self()))
.build();
}
}

View file

@ -295,18 +295,17 @@ public class IntegrationDocTest extends AbstractJavaTest {
//#sometimes-slow-service
//#ask-actor
static class Translator extends UntypedActor {
static class Translator extends AbstractActor {
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String word = (String) message;
public Receive createReceive() {
return receiveBuilder()
.match(String.class, word -> {
// ... process message
String reply = word.toUpperCase();
// reply to the ask
sender().tell(reply, self());
} else {
unhandled(message);
}
})
.build();
}
}
//#ask-actor

View file

@ -27,54 +27,52 @@ public class ParentChildTest {
private final ActorSystem system = actorSystemResource.getSystem();
//#test-example
static class Parent extends UntypedActor {
static class Parent extends AbstractActor {
final ActorRef child = getContext().actorOf(Props.create(Child.class), "child");
boolean ponged = false;
@Override public void onReceive(Object message) throws Exception {
if ("pingit".equals(message)) {
child.tell("ping", self());
} else if ("pong".equals(message)) {
ponged = true;
} else {
unhandled(message);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("pingit", message -> child.tell("ping", self()))
.matchEquals("pong", message -> ponged = true)
.build();
}
}
static class Child extends UntypedActor {
@Override public void onReceive(Object message) throws Exception {
if ("ping".equals(message)) {
static class Child extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("ping", message -> {
getContext().parent().tell("pong", self());
} else {
unhandled(message);
}
})
.build();
}
}
//#test-example
static
//#test-dependentchild
class DependentChild extends UntypedActor {
class DependentChild extends AbstractActor {
private final ActorRef parent;
public DependentChild(ActorRef parent) {
this.parent = parent;
}
@Override public void onReceive(Object message) throws Exception {
if ("ping".equals(message)) {
parent.tell("pong", self());
} else {
unhandled(message);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("ping", message -> parent.tell("pong", self()))
.build();
}
}
//#test-dependentchild
static
//#test-dependentparent
class DependentParent extends UntypedActor {
class DependentParent extends AbstractActor {
final ActorRef child;
boolean ponged = false;
@ -82,21 +80,19 @@ public class ParentChildTest {
child = getContext().actorOf(childProps, "child");
}
@Override public void onReceive(Object message) throws Exception {
if ("pingit".equals(message)) {
child.tell("ping", self());
} else if ("pong".equals(message)) {
ponged = true;
} else {
unhandled(message);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("pingit", message -> child.tell("ping", self()))
.matchEquals("pong", message -> ponged = true)
.build();
}
}
//#test-dependentparent
static
//#test-dependentparent-generic
class GenericDependentParent extends UntypedActor {
class GenericDependentParent extends AbstractActor {
final ActorRef child;
boolean ponged = false;
@ -105,14 +101,12 @@ public class ParentChildTest {
child = childMaker.apply(getContext());
}
@Override public void onReceive(Object message) throws Exception {
if ("pingit".equals(message)) {
child.tell("ping", self());
} else if ("pong".equals(message)) {
ponged = true;
} else {
unhandled(message);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("pingit", message -> child.tell("ping", self()))
.matchEquals("pong", message -> ponged = true)
.build();
}
}
//#test-dependentparent-generic
@ -174,15 +168,21 @@ public class ParentChildTest {
}
@Override public Actor create() throws Exception {
return new UntypedActor() {
return new AbstractActor() {
final ActorRef child = getContext().actorOf(Props.create(Child.class), "child");
@Override public void onReceive(Object x) throws Exception {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> {
if (sender().equals(child)) {
proxy.ref().forward(x, getContext());
proxy.ref().forward(message, getContext());
} else {
child.forward(x, getContext());
child.forward(message, getContext());
}
})
.build();
}
};
}

View file

@ -18,7 +18,7 @@ import akka.actor.Kill;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import scala.concurrent.Await;
import scala.concurrent.Future;
import akka.testkit.TestActor.AutoPilot;
@ -34,13 +34,17 @@ public class TestKitDocTest {
private final ActorSystem system = actorSystemResource.getSystem();
//#test-actor-ref
static class MyActor extends UntypedActor {
public void onReceive(Object o) throws Exception {
if (o.equals("say42")) {
static class MyActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("say42", message -> {
sender().tell(42, self());
} else if (o instanceof Exception) {
throw (Exception) o;
}
})
.match(Exception.class, (Exception ex) -> {
throw ex;
})
.build();
}
public boolean testMe() { return true; }
}
@ -259,14 +263,17 @@ public class TestKitDocTest {
//#test-probe
new JavaTestKit(system) {{
// simple actor which just forwards messages
class Forwarder extends UntypedActor {
class Forwarder extends AbstractActor {
final ActorRef target;
@SuppressWarnings("unused")
public Forwarder(ActorRef target) {
this.target = target;
}
public void onReceive(Object msg) {
target.forward(msg, getContext());
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> target.forward(message, getContext()))
.build();
}
}

View file

@ -12,25 +12,27 @@ import org.junit.Test;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.testkit.JavaTestKit;
import scala.concurrent.duration.Duration;
public class TestKitSampleTest {
public static class SomeActor extends UntypedActor {
public static class SomeActor extends AbstractActor {
ActorRef target = null;
public void onReceive(Object msg) {
if (msg.equals("hello")) {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("hello", message -> {
sender().tell("world", self());
if (target != null) target.forward(msg, getContext());
} else if (msg instanceof ActorRef) {
target = (ActorRef) msg;
if (target != null) target.forward(message, getContext());
})
.match(ActorRef.class, actorRef -> {
target = actorRef;
sender().tell("done", self());
}
})
.build();
}
}

View file

@ -30,8 +30,8 @@ by the ``ExecutionContexts`` class to wrap ``Executors`` and ``ExecutorServices`
Use with Actors
---------------
There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg, sender)``),
which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``.
There are generally two ways of getting a reply from an ``AbstractActor``: the first is by a sent message (``actorRef.tell(msg, sender)``),
which only works if the original sender was an ``AbstractActor``) and the second is through a ``Future``.
Using the ``ActorRef``\'s ``ask`` method to send a message will return a ``Future``.
To wait for and retrieve the actual result the simplest method is:
@ -42,11 +42,11 @@ To wait for and retrieve the actual result the simplest method is:
.. includecode:: code/docs/future/FutureDocTest.java
:include: ask-blocking
This will cause the current thread to block and wait for the ``UntypedActor`` to 'complete' the ``Future`` with it's reply.
This will cause the current thread to block and wait for the ``AbstractActor`` to 'complete' the ``Future`` with it's reply.
Blocking is discouraged though as it can cause performance problem.
The blocking operations are located in ``Await.result`` and ``Await.ready`` to make it easy to spot where blocking occurs.
Alternatives to blocking are discussed further within this documentation.
Also note that the ``Future`` returned by an ``UntypedActor`` is a ``Future<Object>`` since an ``UntypedActor`` is dynamic.
Also note that the ``Future`` returned by an ``AbstractActor`` is a ``Future<Object>`` since an ``AbstractActor`` is dynamic.
That is why the cast to ``String`` is used in the above sample.
.. warning::
@ -64,7 +64,7 @@ Use Directly
------------
A common use case within Akka is to have some computation performed concurrently without needing
the extra utility of an ``UntypedActor``. If you find yourself creating a pool of ``UntypedActor``\s for the sole reason
the extra utility of an ``AbstractActor``. If you find yourself creating a pool of ``AbstractActor``\s for the sole reason
of performing a calculation in parallel, there is an easier (and faster) way:
.. includecode:: code/docs/future/FutureDocTest.java
@ -75,8 +75,8 @@ of performing a calculation in parallel, there is an easier (and faster) way:
In the above code the block passed to ``future`` will be executed by the default ``Dispatcher``,
with the return value of the block used to complete the ``Future`` (in this case, the result would be the string: "HelloWorld").
Unlike a ``Future`` that is returned from an ``UntypedActor``, this ``Future`` is properly typed,
and we also avoid the overhead of managing an ``UntypedActor``.
Unlike a ``Future`` that is returned from an ``AbstractActor``, this ``Future`` is properly typed,
and we also avoid the overhead of managing an ``AbstractActor``.
You can also create already completed Futures using the ``Futures`` class, which can be either successes:
@ -229,7 +229,7 @@ Exceptions
----------
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently.
It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught
It doesn't matter if an ``AbstractActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught
the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``,
calling ``Await.result`` will cause it to be thrown again so it can be handled properly.

View file

@ -402,11 +402,11 @@ MDC values defined by the application
One useful feature available in Slf4j is `MDC <http://logback.qos.ch/manual/mdc.html>`_,
Akka has a way for let the application specify custom values, you just need to get a
specialized :class:`LoggingAdapter`, the :class:`DiagnosticLoggingAdapter`. In order to
get it you will use the factory receiving an UntypedActor as logSource:
get it you will use the factory receiving an AbstractActor as logSource:
.. code-block:: scala
// Within your UntypedActor
// Within your AbstractActor
final DiagnosticLoggingAdapter log = Logging.getLogger(this);
Once you have the logger, you just need to add the custom values before you log something.

View file

@ -85,15 +85,15 @@ actors may of course also process commands that do not change application state
Akka persistence supports event sourcing with the ``AbstractPersistentActor`` abstract class. An actor that extends this
class uses the ``persist`` method to persist and handle events. The behavior of an ``AbstractPersistentActor``
is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example.
is defined by implementing ``createReceiveRecover`` and ``createReceive``. This is demonstrated in the following example.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``.
The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example,
The persistent actor's ``createReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The persistent actor's ``createReceive`` method is a command handler. In this example,
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument.
@ -125,8 +125,8 @@ It contains instructions on how to run the ``PersistentActorExample``.
It's also possible to switch between different command handlers during normal processing and recovery
with ``getContext().become()`` and ``getContext().unbecome()``. To get the actor into the same state after
recovery you need to take special care to perform the same state transitions with ``become`` and
``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler.
Note that when using ``become`` from ``receiveRecover`` it will still only use the ``receiveRecover``
``unbecome`` in the ``createReceiveRecover`` method as you would have done in the command handler.
Note that when using ``become`` from ``createReceiveRecover`` it will still only use the ``createReceiveRecover``
behavior when replaying the events. When replay is completed it will use the new behavior.
Identifiers

View file

@ -178,7 +178,7 @@ you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
.. includecode:: code/docs/actor/TypedActorDocTest.java
:include: typed-actor-hierarchy
You can also create a child Typed Actor in regular Akka Actors by giving the ``UntypedActorContext``
You can also create a child Typed Actor in regular Akka Actors by giving the ``AbstractActor.ActorContext``
as an input parameter to TypedActor.get(…).
Supervisor Strategy

View file

@ -5,7 +5,7 @@ import java.util.Collections;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.metrics.AdaptiveLoadBalancingGroup;
import akka.cluster.metrics.AdaptiveLoadBalancingPool;
import akka.cluster.routing.ClusterRouterGroup;
@ -16,7 +16,7 @@ import akka.cluster.metrics.HeapMetricsSelector;
import akka.cluster.metrics.SystemLoadAverageMetricsSelector;
//not used, only for documentation
abstract class FactorialFrontend2 extends UntypedActor {
abstract class FactorialFrontend2 extends AbstractActor {
//#router-lookup-in-code
int totalInstances = 100;
Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", "");
@ -31,7 +31,7 @@ abstract class FactorialFrontend2 extends UntypedActor {
}
//not used, only for documentation
abstract class FactorialFrontend3 extends UntypedActor {
abstract class FactorialFrontend3 extends AbstractActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;

View file

@ -1,38 +1,28 @@
package sample.cluster.factorial;
import java.math.BigInteger;
import java.util.concurrent.Callable;
import scala.concurrent.Future;
import akka.actor.UntypedActor;
import akka.dispatch.Mapper;
import akka.actor.AbstractActor;
import static akka.dispatch.Futures.future;
import static akka.pattern.Patterns.pipe;
//#backend
public class FactorialBackend extends UntypedActor {
public class FactorialBackend extends AbstractActor {
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
final Integer n = (Integer) message;
Future<BigInteger> f = future(new Callable<BigInteger>() {
public BigInteger call() {
return factorial(n);
}
}, getContext().dispatcher());
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, n -> {
Future<BigInteger> f = future(() -> factorial(n),
getContext().dispatcher());
Future<FactorialResult> result = f.map(
new Mapper<BigInteger, FactorialResult>() {
public FactorialResult apply(BigInteger factorial) {
return new FactorialResult(n, factorial);
}
}, getContext().dispatcher());
Future<FactorialResult> result =
f.map(factorial -> new FactorialResult(n, factorial),
getContext().dispatcher());
pipe(result, getContext().dispatcher()).to(getSender());
} else {
unhandled(message);
}
pipe(result, getContext().dispatcher()).to(sender());
})
.build();
}
BigInteger factorial(int n) {

View file

@ -4,13 +4,13 @@ import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.FromConfig;
//#frontend
public class FactorialFrontend extends UntypedActor {
public class FactorialFrontend extends AbstractActor {
final int upToN;
final boolean repeat;
@ -31,24 +31,22 @@ public class FactorialFrontend extends UntypedActor {
}
@Override
public void onReceive(Object message) {
if (message instanceof FactorialResult) {
FactorialResult result = (FactorialResult) message;
public Receive createReceive() {
return receiveBuilder()
.match(FactorialResult.class, result -> {
if (result.n == upToN) {
log.debug("{}! = {}", result.n, result.factorial);
if (repeat)
sendJobs();
else
getContext().stop(getSelf());
getContext().stop(self());
}
} else if (message instanceof ReceiveTimeout) {
})
.match(ReceiveTimeout.class, x -> {
log.info("Timeout");
sendJobs();
} else {
unhandled(message);
}
})
.build();
}
void sendJobs() {

View file

@ -1,7 +1,7 @@
package sample.cluster.factorial;
//#metrics-listener
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.metrics.ClusterMetricsChanged;
@ -13,7 +13,7 @@ import akka.cluster.metrics.ClusterMetricsExtension;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MetricsListener extends UntypedActor {
public class MetricsListener extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@ -33,23 +33,21 @@ public class MetricsListener extends UntypedActor {
extension.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof ClusterMetricsChanged) {
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
public Receive createReceive() {
return receiveBuilder()
.match(ClusterMetricsChanged.class, clusterMetrics -> {
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
logHeap(nodeMetrics);
logCpu(nodeMetrics);
}
}
} else if (message instanceof CurrentClusterState) {
})
.match(CurrentClusterState.class, message -> {
// Ignore.
} else {
unhandled(message);
}
})
.build();
}
void logHeap(NodeMetrics nodeMetrics) {

View file

@ -1,6 +1,6 @@
package sample.cluster.simple;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberEvent;
@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class SimpleClusterListener extends UntypedActor {
public class SimpleClusterListener extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@ -18,7 +18,7 @@ public class SimpleClusterListener extends UntypedActor {
@Override
public void preStart() {
//#subscribe
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
MemberEvent.class, UnreachableMember.class);
//#subscribe
}
@ -26,29 +26,24 @@ public class SimpleClusterListener extends UntypedActor {
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
cluster.unsubscribe(self());
}
@Override
public void onReceive(Object message) {
if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
public Receive createReceive() {
return receiveBuilder()
.match(MemberUp.class, mUp -> {
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
})
.match(UnreachableMember.class, mUnreachable -> {
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
})
.match(MemberRemoved.class, mRemoved -> {
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
})
.match(MemberEvent.class, mEvent -> {
// ignore
} else {
unhandled(message);
}
})
.build();
}
}

View file

@ -1,6 +1,6 @@
package sample.cluster.simple;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
@ -10,7 +10,7 @@ import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class SimpleClusterListener2 extends UntypedActor {
public class SimpleClusterListener2 extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@ -18,40 +18,34 @@ public class SimpleClusterListener2 extends UntypedActor {
@Override
public void preStart() {
//#subscribe
cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);
cluster.subscribe(self(), MemberEvent.class, UnreachableMember.class);
//#subscribe
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
cluster.unsubscribe(self());
}
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
public Receive createReceive() {
return receiveBuilder()
.match(CurrentClusterState.class, state -> {
log.info("Current members: {}", state.members());
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
})
.match(MemberUp.class, mUp -> {
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
})
.match(UnreachableMember.class, mUnreachable -> {
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
})
.match(MemberRemoved.class, mRemoved -> {
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
})
.match(MemberEvent.class, mEvent -> {
// ignore
} else {
unhandled(message);
}
})
.build();
}
}

View file

@ -4,7 +4,7 @@ import java.util.Collections;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.cluster.routing.ClusterRouterPool;
@ -13,7 +13,7 @@ import akka.routing.ConsistentHashingGroup;
import akka.routing.ConsistentHashingPool;
//not used, only for documentation
abstract class StatsService2 extends UntypedActor {
abstract class StatsService2 extends AbstractActor {
//#router-lookup-in-code
int totalInstances = 100;
Iterable<String> routeesPaths = Collections
@ -28,7 +28,7 @@ abstract class StatsService2 extends UntypedActor {
}
//not used, only for documentation
abstract class StatsService3 extends UntypedActor {
abstract class StatsService3 extends AbstractActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;

View file

@ -9,10 +9,10 @@ import sample.cluster.stats.StatsMessages.StatsResult;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#aggregator
public class StatsAggregator extends UntypedActor {
public class StatsAggregator extends AbstractActor {
final int expectedResults;
final ActorRef replyTo;
@ -29,27 +29,25 @@ public class StatsAggregator extends UntypedActor {
}
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
Integer wordCount = (Integer) message;
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, wordCount -> {
results.add(wordCount);
if (results.size() == expectedResults) {
int sum = 0;
for (int c : results)
sum += c;
double meanWordLength = ((double) sum) / results.size();
replyTo.tell(new StatsResult(meanWordLength), getSelf());
getContext().stop(getSelf());
replyTo.tell(new StatsResult(meanWordLength), self());
getContext().stop(self());
}
} else if (message == ReceiveTimeout.getInstance()) {
})
.match(ReceiveTimeout.class, timeout -> {
replyTo.tell(new JobFailed("Service unavailable, try again later"),
getSelf());
getContext().stop(getSelf());
} else {
unhandled(message);
}
self());
getContext().stop(self());
})
.build();
}
}

View file

@ -3,12 +3,12 @@ package sample.cluster.stats;
import sample.cluster.stats.StatsMessages.StatsJob;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
import akka.routing.FromConfig;
//#service
public class StatsService extends UntypedActor {
public class StatsService extends AbstractActor {
// This router is used both with lookup and deploy of routees. If you
// have a router with only lookup of routees you can use Props.empty()
@ -18,14 +18,11 @@ public class StatsService extends UntypedActor {
"workerRouter");
@Override
public void onReceive(Object message) {
if (message instanceof StatsJob) {
StatsJob job = (StatsJob) message;
if (job.getText().equals("")) {
unhandled(message);
} else {
public Receive createReceive() {
return receiveBuilder()
.match(StatsJob.class, job -> !"".equals(job.getText()), job -> {
final String[] words = job.getText().split(" ");
final ActorRef replyTo = getSender();
final ActorRef replyTo = sender();
// create actor that collects replies from workers
ActorRef aggregator = getContext().actorOf(
@ -36,11 +33,8 @@ public class StatsService extends UntypedActor {
workerRouter.tell(new ConsistentHashableEnvelope(word, word),
aggregator);
}
}
} else {
unhandled(message);
}
})
.build();
}
}

View file

@ -3,28 +3,25 @@ package sample.cluster.stats;
import java.util.HashMap;
import java.util.Map;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#worker
public class StatsWorker extends UntypedActor {
public class StatsWorker extends AbstractActor {
Map<String, Integer> cache = new HashMap<String, Integer>();
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String word = (String) message;
public Receive createReceive() {
return receiveBuilder()
.match(String.class, word -> {
Integer length = cache.get(word);
if (length == null) {
length = word.length();
cache.put(word, length);
}
getSender().tell(length, getSelf());
} else {
unhandled(message);
sender().tell(length, self());
})
.build();
}
}
}
//#worker

View file

@ -3,7 +3,7 @@ package sample.cluster.transformation;
import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;
import sample.cluster.transformation.TransformationMessages.TransformationJob;
import sample.cluster.transformation.TransformationMessages.TransformationResult;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
@ -11,44 +11,40 @@ import akka.cluster.Member;
import akka.cluster.MemberStatus;
//#backend
public class TransformationBackend extends UntypedActor {
public class TransformationBackend extends AbstractActor {
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberUp.class);
cluster.subscribe(self(), MemberUp.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
cluster.unsubscribe(self());
}
@Override
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new TransformationResult(job.getText().toUpperCase()),
getSelf());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
public Receive createReceive() {
return receiveBuilder()
.match(TransformationJob.class, job -> {
sender().tell(new TransformationResult(job.getText().toUpperCase()),
self());
})
.match(CurrentClusterState.class, state -> {
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
})
.match(MemberUp.class, mUp -> {
register(mUp.member());
} else {
unhandled(message);
}
})
.build();
}
void register(Member member) {

View file

@ -9,40 +9,35 @@ import sample.cluster.transformation.TransformationMessages.JobFailed;
import sample.cluster.transformation.TransformationMessages.TransformationJob;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.actor.AbstractActor;
//#frontend
public class TransformationFrontend extends UntypedActor {
public class TransformationFrontend extends AbstractActor {
List<ActorRef> backends = new ArrayList<ActorRef>();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
TransformationJob job = (TransformationJob) message;
getSender().tell(
public Receive createReceive() {
return receiveBuilder()
.match(TransformationJob.class, job -> backends.isEmpty(), job -> {
sender().tell(
new JobFailed("Service unavailable, try again later", job),
getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
sender());
})
.match(TransformationJob.class, job -> {
jobCounter++;
backends.get(jobCounter % backends.size())
.forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
})
.matchEquals(BACKEND_REGISTRATION, m -> {
getContext().watch(sender());
backends.add(sender());
})
.match(Terminated.class, terminated -> {
backends.remove(terminated.getActor());
} else {
unhandled(message);
})
.build();
}
}
}
//#frontend

View file

@ -135,29 +135,29 @@ public class ShoppingCart extends AbstractActor {
public ShoppingCart(String userId) {
this.userId = userId;
this.dataKey = LWWMapKey.create("cart-" + userId);
receive(matchGetCart()
.orElse(matchAddItem())
.orElse(matchRemoveItem())
.orElse(matchOther()));
}
@Override
public Receive createReceive() {
return matchGetCart()
.orElse(matchAddItem())
.orElse(matchRemoveItem())
.orElse(matchOther());
}
//#get-cart
private PartialFunction<Object, BoxedUnit> matchGetCart() {
return ReceiveBuilder
.matchEquals((GET_CART),
s -> receiveGetCart())
.match(GetSuccess.class, g -> isResponseToGetCart(g),
private Receive matchGetCart() {
return receiveBuilder()
.matchEquals(GET_CART, s -> receiveGetCart())
.match(GetSuccess.class, this::isResponseToGetCart,
g -> receiveGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
.match(NotFound.class, n -> isResponseToGetCart(n),
.match(NotFound.class, this::isResponseToGetCart,
n -> receiveNotFound((NotFound<LWWMap<String, LineItem>>) n))
.match(GetFailure.class, f -> isResponseToGetCart(f),
.match(GetFailure.class, this::isResponseToGetCart,
f -> receiveGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
.build();
}
private void receiveGetCart() {
Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx),
@ -189,9 +189,9 @@ public class ShoppingCart extends AbstractActor {
//#get-cart
//#add-item
private PartialFunction<Object, BoxedUnit> matchAddItem() {
return ReceiveBuilder
.match(AddItem.class, r -> receiveAddItem(r))
private Receive matchAddItem() {
return receiveBuilder()
.match(AddItem.class, this::receiveAddItem)
.build();
}
@ -214,14 +214,14 @@ public class ShoppingCart extends AbstractActor {
}
}
private PartialFunction<Object, BoxedUnit> matchRemoveItem() {
return ReceiveBuilder
.match(RemoveItem.class, r -> receiveRemoveItem(r))
.match(GetSuccess.class, g -> isResponseToRemoveItem(g),
private Receive matchRemoveItem() {
return receiveBuilder()
.match(RemoveItem.class, this::receiveRemoveItem)
.match(GetSuccess.class, this::isResponseToRemoveItem,
g -> receiveRemoveItemGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
.match(GetFailure.class, f -> isResponseToRemoveItem(f),
.match(GetFailure.class, this::isResponseToRemoveItem,
f -> receiveRemoveItemGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
.match(NotFound.class, n -> isResponseToRemoveItem(n), n -> {/* nothing to remove */})
.match(NotFound.class, this::isResponseToRemoveItem, n -> {/* nothing to remove */})
.build();
}
@ -258,8 +258,8 @@ public class ShoppingCart extends AbstractActor {
}
//#remove-item
private PartialFunction<Object, BoxedUnit> matchOther() {
return ReceiveBuilder
private Receive matchOther() {
return receiveBuilder()
.match(UpdateSuccess.class, u -> {
// ok
})
@ -272,6 +272,4 @@ public class ShoppingCart extends AbstractActor {
.build();
}
}

View file

@ -9,11 +9,8 @@ package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.SnapshotOffer;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
import java.util.ArrayList;
@ -88,31 +85,31 @@ class ExamplePersistentActor extends AbstractPersistentActor {
public String persistenceId() { return "sample-id-1"; }
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(Evt.class, state::update).
match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build();
public Receive createReceiveRecover() {
return receiveBuilder()
.match(Evt.class, state::update)
.match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
.build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(Cmd.class, c -> {
public Receive createReceive() {
return receiveBuilder()
.match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persistAll(asList(evt1, evt2), (Evt evt) -> {
state.update(evt);
if (evt.equals(evt2)) {
context().system().eventStream().publish(evt);
getContext().system().eventStream().publish(evt);
}
});
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())).
match(String.class, s -> s.equals("print"), s -> System.out.println(state)).
build();
})
.matchEquals("snap", s -> saveSnapshot(state.copy()))
.matchEquals("print", s -> System.out.println(state))
.build();
}
}
//#persistent-actor-example