!pro #17380 Build with Java 8

* genjavadoc adjustments for java8
This commit is contained in:
Patrik Nordwall 2015-05-05 16:44:49 +02:00
parent 67a9e62254
commit 0953e7aee3
41 changed files with 156 additions and 368 deletions

View file

@ -0,0 +1,664 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actorlambda.Messages.Swap.Swap;
import static docs.actorlambda.Messages.*;
import static akka.japi.Util.immutableSeq;
import java.util.concurrent.TimeUnit;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
//#import-props
import akka.actor.Props;
//#import-props
//#import-actorRef
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
//#import-actorRef
//#import-identify
import akka.actor.ActorIdentity;
import akka.actor.ActorSelection;
import akka.actor.Identify;
//#import-identify
//#import-graceFulStop
import akka.pattern.AskTimeoutException;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-graceFulStop
public class ActorDocTest {
public static Config config = ConfigFactory.parseString(
"akka {\n" +
" loggers = [\"akka.testkit.TestEventListener\"]\n" +
" loglevel = \"WARNING\"\n" +
" stdout-loglevel = \"WARNING\"\n" +
"}\n"
);
static ActorSystem system = null;
@BeforeClass
public static void beforeClass() {
system = ActorSystem.create("ActorDocTest", config);
}
@AfterClass
public static void afterClass() throws Exception {
Await.ready(system.terminate(), Duration.create("5 seconds"));
}
static
//#context-actorOf
public class FirstActor extends AbstractActor {
final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild");
//#plus-some-behavior
public FirstActor() {
receive(ReceiveBuilder.
matchAny(x -> {
sender().tell(x, self());
}).build()
);
}
//#plus-some-behavior
}
//#context-actorOf
static public abstract class SomeActor extends AbstractActor {
//#receive-constructor
public SomeActor() {
receive(ReceiveBuilder.
//#and-some-behavior
match(String.class, s -> System.out.println(s.toLowerCase())).
//#and-some-behavior
build());
}
//#receive-constructor
@Override
//#receive
public abstract PartialFunction<Object, BoxedUnit> receive();
//#receive
}
static public class ActorWithArgs extends AbstractActor {
private final String args;
ActorWithArgs(String args) {
this.args = args;
receive(ReceiveBuilder.
matchAny(x -> { }).build()
);
}
}
static
//#props-factory
public class DemoActor extends AbstractActor {
/**
* Create Props for an actor of this type.
* @param magicNumber The magic number to be passed to this actors constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
static Props props(Integer magicNumber) {
// You need to specify the actual type of the returned actor
// since Java 8 lambdas have some runtime type information erased
return Props.create(DemoActor.class, () -> new DemoActor(magicNumber));
}
private final Integer magicNumber;
DemoActor(Integer magicNumber) {
this.magicNumber = magicNumber;
receive(ReceiveBuilder.
match(Integer.class, i -> {
sender().tell(i + magicNumber, self());
}).build()
);
}
}
//#props-factory
static
//#props-factory
public class SomeOtherActor extends AbstractActor {
// Props(new DemoActor(42)) would not be safe
ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo");
// ...
//#props-factory
public SomeOtherActor() {
receive(emptyBehavior());
}
//#props-factory
}
//#props-factory
static
//#messages-in-companion
public class DemoMessagesActor extends AbstractLoggingActor {
static public class Greeting {
private final String from;
public Greeting(String from) {
this.from = from;
}
public String getGreeter() {
return from;
}
}
DemoMessagesActor() {
receive(ReceiveBuilder.
match(Greeting.class, g -> {
log().info("I was greeted by {}", g.getGreeter());
}).build()
);
};
}
//#messages-in-companion
public static class Hook extends AbstractActor {
ActorRef target = null;
public Hook() {
receive(emptyBehavior());
}
//#preStart
@Override
public void preStart() {
target = context().actorOf(Props.create(MyActor.class, "target"));
}
//#preStart
//#postStop
@Override
public void postStop() {
//#clean-up-some-resources
final String message = "stopped";
//#tell
// dont forget to think about who is the sender (2nd argument)
target.tell(message, self());
//#tell
final Object result = "";
//#forward
target.forward(result, context());
//#forward
target = null;
//#clean-up-some-resources
}
//#postStop
// compilation test only
public void compileSelections() {
//#selection-local
// will look up this absolute path
context().actorSelection("/user/serviceA/actor");
// will look up sibling beneath same supervisor
context().actorSelection("../joe");
//#selection-local
//#selection-wildcard
// will look all children to serviceB with names starting with worker
context().actorSelection("/user/serviceB/worker*");
// will look up all siblings beneath same supervisor
context().actorSelection("../*");
//#selection-wildcard
//#selection-remote
context().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");
//#selection-remote
}
}
public static class ReplyException extends AbstractActor {
public ReplyException() {
receive(ReceiveBuilder.
matchAny(x -> {
//#reply-exception
try {
String result = operation();
sender().tell(result, self());
} catch (Exception e) {
sender().tell(new akka.actor.Status.Failure(e), self());
throw e;
}
//#reply-exception
}).build()
);
}
private String operation() {
return "Hi";
}
}
static
//#gracefulStop-actor
public class Manager extends AbstractActor {
private static enum Shutdown {
Shutdown
}
public static final Shutdown SHUTDOWN = Shutdown.Shutdown;
private ActorRef worker =
context().watch(context().actorOf(Props.create(Cruncher.class), "worker"));
public Manager() {
receive(ReceiveBuilder.
matchEquals("job", s -> {
worker.tell("crunch", self());
}).
matchEquals(SHUTDOWN, x -> {
worker.tell(PoisonPill.getInstance(), self());
context().become(shuttingDown);
}).build()
);
}
public PartialFunction<Object, BoxedUnit> shuttingDown =
ReceiveBuilder.
matchEquals("job", s -> {
sender().tell("service unavailable, shutting down", self());
}).
match(Terminated.class, t -> t.actor().equals(worker), t -> {
context().stop(self());
}).build();
}
//#gracefulStop-actor
@Test
public void usePatternsGracefulStop() throws Exception {
ActorRef actorRef = system.actorOf(Props.create(Manager.class));
//#gracefulStop
try {
Future<Boolean> stopped =
gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (AskTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
//#gracefulStop
}
public static class Cruncher extends AbstractActor {
public Cruncher() {
receive(ReceiveBuilder.
matchEquals("crunch", s -> { }).build()
);
}
}
static
//#swapper
public class Swapper extends AbstractLoggingActor {
public Swapper() {
receive(ReceiveBuilder.
matchEquals(Swap, s -> {
log().info("Hi");
context().become(ReceiveBuilder.
matchEquals(Swap, x -> {
log().info("Ho");
context().unbecome(); // resets the latest 'become' (just for fun)
}).build(), false); // push on top instead of replace
}).build()
);
}
}
//#swapper
static
//#swapper
public class SwapperApp {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("SwapperSystem");
ActorRef swapper = system.actorOf(Props.create(Swapper.class), "swapper");
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
system.terminate();
}
}
//#swapper
@Test
public void creatingActorWithSystemActorOf() {
//#system-actorOf
// ActorSystem is a heavy object: create only one per application
final ActorSystem system = ActorSystem.create("MySystem", config);
final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
//#system-actorOf
try {
new JavaTestKit(system) {
{
myActor.tell("hello", getRef());
expectMsgEquals("hello");
}
};
} finally {
JavaTestKit.shutdownActorSystem(system);
}
}
@Test
public void creatingPropsConfig() {
//#creating-props
Props props1 = Props.create(MyActor.class);
Props props2 = Props.create(ActorWithArgs.class,
() -> new ActorWithArgs("arg")); // careful, see below
Props props3 = Props.create(ActorWithArgs.class, "arg");
//#creating-props
//#creating-props-deprecated
// NOT RECOMMENDED within another actor:
// encourages to close over enclosing class
Props props7 = Props.create(ActorWithArgs.class,
() -> new ActorWithArgs("arg"));
//#creating-props-deprecated
}
@Test(expected=IllegalArgumentException.class)
public void creatingPropsIllegal() {
//#creating-props-illegal
// This will throw an IllegalArgumentException since some runtime
// type information of the lambda is erased.
// Use Props.create(actorClass, Creator) instead.
Props props = Props.create(() -> new ActorWithArgs("arg"));
//#creating-props-illegal
}
static
//#receive-timeout
public class ReceiveTimeoutActor extends AbstractActor {
//#receive-timeout
ActorRef target = context().system().deadLetters();
//#receive-timeout
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
receive(ReceiveBuilder.
matchEquals("Hello", s -> {
// To set in a response to a message
context().setReceiveTimeout(Duration.create("1 second"));
//#receive-timeout
target = sender();
target.tell("Hello world", self());
//#receive-timeout
}).
match(ReceiveTimeout.class, r -> {
// To turn it off
context().setReceiveTimeout(Duration.Undefined());
//#receive-timeout
target.tell("timeout", self());
//#receive-timeout
}).build()
);
}
}
//#receive-timeout
@Test
public void using_receiveTimeout() {
final ActorRef myActor = system.actorOf(Props.create(ReceiveTimeoutActor.class));
new JavaTestKit(system) {
{
myActor.tell("Hello", getRef());
expectMsgEquals("Hello world");
expectMsgEquals("timeout");
}
};
}
static
//#hot-swap-actor
public class HotSwapActor extends AbstractActor {
private PartialFunction<Object, BoxedUnit> angry;
private PartialFunction<Object, BoxedUnit> happy;
public HotSwapActor() {
angry =
ReceiveBuilder.
matchEquals("foo", s -> {
sender().tell("I am already angry?", self());
}).
matchEquals("bar", s -> {
context().become(happy);
}).build();
happy = ReceiveBuilder.
matchEquals("bar", s -> {
sender().tell("I am already happy :-)", self());
}).
matchEquals("foo", s -> {
context().become(angry);
}).build();
receive(ReceiveBuilder.
matchEquals("foo", s -> {
context().become(angry);
}).
matchEquals("bar", s -> {
context().become(happy);
}).build()
);
}
}
//#hot-swap-actor
@Test
public void using_hot_swap() {
final ActorRef actor = system.actorOf(Props.create(HotSwapActor.class), "hot");
new JavaTestKit(system) {
{
actor.tell("foo", getRef());
actor.tell("foo", getRef());
expectMsgEquals("I am already angry?");
actor.tell("bar", getRef());
actor.tell("bar", getRef());
expectMsgEquals("I am already happy :-)");
actor.tell("foo", getRef());
actor.tell("foo", getRef());
expectMsgEquals("I am already angry?");
expectNoMsg(Duration.create(1, TimeUnit.SECONDS));
}
};
}
static
//#stash
public class ActorWithProtocol extends AbstractActorWithStash {
public ActorWithProtocol() {
receive(ReceiveBuilder.
matchEquals("open", s -> {
context().become(ReceiveBuilder.
matchEquals("write", ws -> { /* do writing */ }).
matchEquals("close", cs -> {
unstashAll();
context().unbecome();
}).
matchAny(msg -> stash()).build(), false);
}).
matchAny(msg -> stash()).build()
);
}
}
//#stash
@Test
public void using_Stash() {
final ActorRef actor = system.actorOf(Props.create(ActorWithProtocol.class), "stash");
}
static
//#watch
public class WatchActor extends AbstractActor {
private final ActorRef child = context().actorOf(Props.empty(), "target");
private ActorRef lastSender = system.deadLetters();
public WatchActor() {
context().watch(child); // <-- this is the only call needed for registration
receive(ReceiveBuilder.
matchEquals("kill", s -> {
context().stop(child);
lastSender = sender();
}).
match(Terminated.class, t -> t.actor().equals(child), t -> {
lastSender.tell("finished", self());
}).build()
);
}
}
//#watch
@Test
public void using_watch() {
ActorRef actor = system.actorOf(Props.create(WatchActor.class));
new JavaTestKit(system) {
{
actor.tell("kill", getRef());
expectMsgEquals("finished");
}
};
}
static
//#identify
public class Follower extends AbstractActor {
final Integer identifyId = 1;
public Follower(){
ActorSelection selection = context().actorSelection("/user/another");
selection.tell(new Identify(identifyId), self());
receive(ReceiveBuilder.
match(ActorIdentity.class, id -> id.getRef() != null, id -> {
ActorRef ref = id.getRef();
context().watch(ref);
context().become(active(ref));
}).
match(ActorIdentity.class, id -> id.getRef() == null, id -> {
context().stop(self());
}).build()
);
}
final PartialFunction<Object, BoxedUnit> active(final ActorRef another) {
return ReceiveBuilder.
match(Terminated.class, t -> t.actor().equals(another), t -> {
context().stop(self());
}).build();
}
}
//#identify
@Test
public void using_Identify() {
ActorRef a = system.actorOf(Props.empty());
ActorRef b = system.actorOf(Props.create(Follower.class));
new JavaTestKit(system) {
{
watch(b);
system.stop(a);
assertEquals(expectMsgClass(Duration.create(2, TimeUnit.SECONDS), Terminated.class).actor(), b);
}
};
}
public static class NoReceiveActor extends AbstractActor {
}
@Test
public void noReceiveActor() {
EventFilter ex1 = new ErrorFilter(ActorInitializationException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
final ActorRef victim = new EventFilter<ActorRef>(ActorInitializationException.class) {
protected ActorRef run() {
return system.actorOf(Props.create(NoReceiveActor.class), "victim");
}
}.message("Actor behavior has not been set with receive(...)").occurrences(1).exec();
assertEquals(true, victim.isTerminated());
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
public static class MultipleReceiveActor extends AbstractActor {
public MultipleReceiveActor() {
receive(ReceiveBuilder.
match(String.class, s1 -> s1.toLowerCase().equals("become"), s1 -> {
sender().tell(s1.toUpperCase(), self());
receive(ReceiveBuilder.
match(String.class, s2 -> {
sender().tell(s2.toLowerCase(), self());
}).build()
);
}).
match(String.class, s1 -> {
sender().tell(s1.toUpperCase(), self());
}).build()
);
}
}
@Test
public void multipleReceiveActor() {
EventFilter ex1 = new ErrorFilter(IllegalActorStateException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
new EventFilter<Boolean>(IllegalActorStateException.class) {
protected Boolean run() {
ActorRef victim = system.actorOf(Props.create(MultipleReceiveActor.class), "victim2");
victim.tell("Foo", getRef());
expectMsgEquals("FOO");
victim.tell("bEcoMe", getRef());
expectMsgEquals("BECOME");
victim.tell("Foo", getRef());
// if it's upper case, then the actor was restarted
expectMsgEquals("FOO");
return true;
}
}.message("Actor behavior has already been set with receive(...), " +
"use context().become(...) to change it later").occurrences(1).exec();
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
}

View file

@ -0,0 +1,204 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
//#testkit
import akka.actor.*;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.duration.Duration;
import akka.testkit.TestProbe;
//#testkit
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import akka.testkit.JavaTestKit;
import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq;
import scala.Option;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import scala.runtime.BoxedUnit;
//#testkit
public class FaultHandlingTest {
//#testkit
public static Config config = ConfigFactory.parseString(
"akka {\n" +
" loggers = [\"akka.testkit.TestEventListener\"]\n" +
" loglevel = \"WARNING\"\n" +
" stdout-loglevel = \"WARNING\"\n" +
"}\n");
static
//#supervisor
public class Supervisor extends AbstractActor {
//#strategy
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder.
match(ArithmeticException.class, e -> resume()).
match(NullPointerException.class, e -> restart()).
match(IllegalArgumentException.class, e -> stop()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
//#strategy
public Supervisor() {
receive(ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build()
);
}
}
//#supervisor
static
//#supervisor2
public class Supervisor2 extends AbstractActor {
//#strategy2
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder.
match(ArithmeticException.class, e -> resume()).
match(NullPointerException.class, e -> restart()).
match(IllegalArgumentException.class, e -> stop()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
//#strategy2
public Supervisor2() {
receive(ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build()
);
}
@Override
public void preRestart(Throwable cause, Option<Object> msg) {
// do not kill all children, which is the default here
}
}
//#supervisor2
static
//#child
public class Child extends AbstractActor {
int state = 0;
public Child() {
receive(ReceiveBuilder.
match(Exception.class, exception -> { throw exception; }).
match(Integer.class, i -> state = i).
matchEquals("get", s -> sender().tell(state, self())).build()
);
}
}
//#child
//#testkit
static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS);
@BeforeClass
public static void start() {
system = ActorSystem.create("FaultHandlingTest", config);
}
@AfterClass
public static void cleanup() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void mustEmploySupervisorStrategy() throws Exception {
// code here
//#testkit
EventFilter ex1 = new ErrorFilter(ArithmeticException.class);
EventFilter ex2 = new ErrorFilter(NullPointerException.class);
EventFilter ex3 = new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = new ErrorFilter(Exception.class);
EventFilter[] ignoreExceptions = { ex1, ex2, ex3, ex4 };
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
//#create
Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
//#create
//#resume
child.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
//#resume
//#restart
child.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#restart
//#stop
final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);
//#stop
//#escalate-kill
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);
//#escalate-kill
//#escalate-restart
superprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#escalate-restart
//#testkit
}
}
//#testkit

View file

@ -0,0 +1,125 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class InitializationDocTest {
static ActorSystem system = null;
@BeforeClass
public static void beforeClass() {
system = ActorSystem.create("InitializationDocTest");
}
@AfterClass
public static void afterClass() throws Exception {
Await.ready(system.terminate(), Duration.create("5 seconds"));
}
public static class MessageInitExample extends AbstractActor {
private String initializeMe = null;
public MessageInitExample() {
//#messageInit
receive(ReceiveBuilder.
matchEquals("init", m1 -> {
initializeMe = "Up and running";
context().become(ReceiveBuilder.
matchEquals("U OK?", m2 -> {
sender().tell(initializeMe, self());
}).build());
}).build()
//#messageInit
);
}
}
public class GenericMessage<T> {
T value;
public GenericMessage(T value) {
this.value = value;
}
}
public static class GenericActor extends AbstractActor {
public GenericActor() {
receive(ReceiveBuilder.match(GenericMessage.class, (GenericMessage<String> msg) -> {
GenericMessage<String> message = msg;
sender().tell(message.value.toUpperCase(), self());
}).build());
}
}
static class GenericActorWithPredicate extends AbstractActor {
public GenericActorWithPredicate() {
FI.TypedPredicate<GenericMessage<String>> typedPredicate = s -> !s.value.isEmpty();
receive(ReceiveBuilder.match(GenericMessage.class, typedPredicate, (GenericMessage<String> msg) -> {
sender().tell(msg.value.toUpperCase(), self());
}).build());
}
}
@Test
public void testIt() {
new JavaTestKit(system) {{
ActorRef testactor = system.actorOf(Props.create(MessageInitExample.class), "testactor");
String msg = "U OK?";
testactor.tell(msg, getRef());
expectNoMsg(Duration.create(1, TimeUnit.SECONDS));
testactor.tell("init", getRef());
testactor.tell(msg, getRef());
expectMsgEquals("Up and running");
}};
}
@Test
public void testGenericActor() {
new JavaTestKit(system) {{
ActorRef genericTestActor = system.actorOf(Props.create(GenericActor.class), "genericActor");
GenericMessage<String> genericMessage = new GenericMessage<String>("a");
genericTestActor.tell(genericMessage, getRef());
expectMsgEquals("A");
}};
}
@Test
public void actorShouldNotRespondForEmptyMessage() {
new JavaTestKit(system) {{
ActorRef genericTestActor = system.actorOf(Props.create(GenericActorWithPredicate.class), "genericActorWithPredicate");
GenericMessage<String> emptyGenericMessage = new GenericMessage<String>("");
GenericMessage<String> nonEmptyGenericMessage = new GenericMessage<String>("a");
genericTestActor.tell(emptyGenericMessage, getRef());
expectNoMsg();
genericTestActor.tell(nonEmptyGenericMessage, getRef());
expectMsgEquals("A");
}};
}
}

View file

@ -0,0 +1,149 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Messages {
static
//#immutable-message
public class ImmutableMessage {
private final int sequenceNumber;
private final List<String> values;
public ImmutableMessage(int sequenceNumber, List<String> values) {
this.sequenceNumber = sequenceNumber;
this.values = Collections.unmodifiableList(new ArrayList<String>(values));
}
public int getSequenceNumber() {
return sequenceNumber;
}
public List<String> getValues() {
return values;
}
}
//#immutable-message
public static class DoIt {
private final ImmutableMessage msg;
DoIt(ImmutableMessage msg) {
this.msg = msg;
}
public ImmutableMessage getMsg() {
return msg;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DoIt doIt = (DoIt) o;
if (!msg.equals(doIt.msg)) return false;
return true;
}
@Override
public int hashCode() {
return msg.hashCode();
}
@Override
public String toString() {
return "DoIt{" +
"msg=" + msg +
'}';
}
}
public static class Message {
final String str;
Message(String str) {
this.str = str;
}
public String getStr() {
return str;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message = (Message) o;
if (!str.equals(message.str)) return false;
return true;
}
@Override
public int hashCode() {
return str.hashCode();
}
@Override
public String toString() {
return "Message{" +
"str='" + str + '\'' +
'}';
}
}
public static enum Swap {
Swap
}
public static class Result {
final String x;
final String s;
public Result(String x, String s) {
this.x = x;
this.s = s;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((s == null) ? 0 : s.hashCode());
result = prime * result + ((x == null) ? 0 : x.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Result other = (Result) obj;
if (s == null) {
if (other.s != null)
return false;
} else if (!s.equals(other.s))
return false;
if (x == null) {
if (other.x != null)
return false;
} else if (!x.equals(other.x))
return false;
return true;
}
}
}

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
//#imports
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
//#imports
//#my-actor
public class MyActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
public MyActor() {
receive(ReceiveBuilder.
match(String.class, s -> {
log.info("Received String message: {}", s);
//#my-actor
//#reply
sender().tell(s, self());
//#reply
//#my-actor
}).
matchAny(o -> log.info("received unknown message")).build()
);
}
}
//#my-actor

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
//#sample-actor
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class SampleActor extends AbstractActor {
private PartialFunction<Object, BoxedUnit> guarded = ReceiveBuilder.
match(String.class, s -> s.contains("guard"), s -> {
sender().tell("contains(guard): " + s, self());
context().unbecome();
}).build();
public SampleActor() {
receive(ReceiveBuilder.
match(Double.class, d -> {
sender().tell(d.isNaN() ? 0 : d, self());
}).
match(Integer.class, i -> {
sender().tell(i * 10, self());
}).
match(String.class, s -> s.startsWith("guard"), s -> {
sender().tell("startsWith(guard): " + s.toUpperCase(), self());
context().become(guarded, false);
}).build()
);
}
}
//#sample-actor

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class SampleActorTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("SampleActorTest");
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testSampleActor()
{
new JavaTestKit(system) {{
final ActorRef subject = system.actorOf(Props.create(SampleActor.class), "sample-actor");
final ActorRef probeRef = getRef();
subject.tell(47.11, probeRef);
subject.tell("and no guard in the beginning", probeRef);
subject.tell("guard is a good thing", probeRef);
subject.tell(47.11, probeRef);
subject.tell(4711, probeRef);
subject.tell("and no guard in the beginning", probeRef);
subject.tell(4711, probeRef);
subject.tell("and an unmatched message", probeRef);
expectMsgEquals(47.11);
assertTrue(expectMsgClass(String.class).startsWith("startsWith(guard):"));
assertTrue(expectMsgClass(String.class).startsWith("contains(guard):"));
expectMsgEquals(47110);
expectNoMsg();
}};
}
}

View file

@ -0,0 +1,136 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda.fsm;
//#simple-imports
import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.japi.pf.UnitMatch;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import scala.concurrent.duration.Duration;
//#simple-imports
import static docs.actorlambda.fsm.Buncher.Data;
import static docs.actorlambda.fsm.Buncher.State.*;
import static docs.actorlambda.fsm.Buncher.State;
import static docs.actorlambda.fsm.Buncher.Uninitialized.*;
import static docs.actorlambda.fsm.Events.*;
//#simple-fsm
public class Buncher extends AbstractFSM<State, Data> {
{
//#fsm-body
startWith(Idle, Uninitialized);
//#when-syntax
when(Idle,
matchEvent(SetTarget.class, Uninitialized.class,
(setTarget, uninitialized) ->
stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
//#when-syntax
//#transition-elided
onTransition(
matchState(Active, Idle, () -> {
// reuse this matcher
final UnitMatch<Data> m = UnitMatch.create(
matchData(Todo.class,
todo -> todo.getTarget().tell(new Batch(todo.getQueue()), self())));
m.match(stateData());
}).
state(Idle, Active, () -> {/* Do something here */}));
//#transition-elided
when(Active, Duration.create(1, "second"),
matchEvent(Arrays.asList(Flush.class, StateTimeout()), Todo.class,
(event, todo) -> goTo(Idle).using(todo.copy(new LinkedList<>()))));
//#unhandled-elided
whenUnhandled(
matchEvent(Queue.class, Todo.class,
(queue, todo) -> goTo(Active).using(todo.addElement(queue.getObj()))).
anyEvent((event, state) -> {
log().warning("received unhandled request {} in state {}/{}",
event, stateName(), state);
return stay();
}));
//#unhandled-elided
initialize();
//#fsm-body
}
//#simple-fsm
static
//#simple-state
// states
enum State {
Idle, Active
}
//#simple-state
static
//#simple-state
// state data
interface Data {
}
//#simple-state
static
//#simple-state
enum Uninitialized implements Data {
Uninitialized
}
//#simple-state
static
//#simple-state
final class Todo implements Data {
private final ActorRef target;
private final List<Object> queue;
public Todo(ActorRef target, List<Object> queue) {
this.target = target;
this.queue = queue;
}
public ActorRef getTarget() {
return target;
}
public List<Object> getQueue() {
return queue;
}
//#boilerplate
@Override
public String toString() {
return "Todo{" +
"target=" + target +
", queue=" + queue +
'}';
}
public Todo addElement(Object element) {
List<Object> nQueue = new LinkedList<>(queue);
nQueue.add(element);
return new Todo(this.target, nQueue);
}
public Todo copy(List<Object> queue) {
return new Todo(this.target, queue);
}
public Todo copy(ActorRef target) {
return new Todo(target, this.queue);
}
//#boilerplate
}
//#simple-state
//#simple-fsm
}
//#simple-fsm

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda.fsm;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.LinkedList;
import docs.actorlambda.fsm.*;
import static docs.actorlambda.fsm.Events.Batch;
import static docs.actorlambda.fsm.Events.Queue;
import static docs.actorlambda.fsm.Events.SetTarget;
import static docs.actorlambda.fsm.Events.Flush.Flush;
//#test-code
public class BuncherTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("BuncherTest");
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testBuncherActorBatchesCorrectly() {
new JavaTestKit(system) {{
final ActorRef buncher =
system.actorOf(Props.create(Buncher.class));
final ActorRef probe = getRef();
buncher.tell(new SetTarget(probe), probe);
buncher.tell(new Queue(42), probe);
buncher.tell(new Queue(43), probe);
LinkedList<Object> list1 = new LinkedList<>();
list1.add(42);
list1.add(43);
expectMsgEquals(new Batch(list1));
buncher.tell(new Queue(44), probe);
buncher.tell(Flush, probe);
buncher.tell(new Queue(45), probe);
LinkedList<Object> list2 = new LinkedList<>();
list2.add(44);
expectMsgEquals(new Batch(list2));
LinkedList<Object> list3 = new LinkedList<>();
list3.add(45);
expectMsgEquals(new Batch(list3));
system.stop(buncher);
}};
}
@Test
public void testBuncherActorDoesntBatchUninitialized() {
new JavaTestKit(system) {{
final ActorRef buncher =
system.actorOf(Props.create(Buncher.class));
final ActorRef probe = getRef();
buncher.tell(new Queue(42), probe);
expectNoMsg();
system.stop(buncher);
}};
}
}
//#test-code

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda.fsm;
import akka.actor.ActorRef;
import java.util.List;
public class Events {
static
//#simple-events
public final class SetTarget {
private final ActorRef ref;
public SetTarget(ActorRef ref) {
this.ref = ref;
}
public ActorRef getRef() {
return ref;
}
//#boilerplate
@Override
public String toString() {
return "SetTarget{" +
"ref=" + ref +
'}';
}
//#boilerplate
}
//#simple-events
static
//#simple-events
public final class Queue {
private final Object obj;
public Queue(Object obj) {
this.obj = obj;
}
public Object getObj() {
return obj;
}
//#boilerplate
@Override
public String toString() {
return "Queue{" +
"obj=" + obj +
'}';
}
//#boilerplate
}
//#simple-events
static
//#simple-events
public final class Batch {
private final List<Object> list;
public Batch(List<Object> list) {
this.list = list;
}
public List<Object> getList() {
return list;
}
//#boilerplate
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Batch batch = (Batch) o;
return list.equals(batch.list);
}
@Override
public int hashCode() {
return list.hashCode();
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append( "Batch{list=");
list.stream().forEachOrdered(e -> { builder.append(e); builder.append(","); });
int len = builder.length();
builder.replace(len, len, "}");
return builder.toString();
}
//#boilerplate
}
//#simple-events
static
//#simple-events
public enum Flush {
Flush
}
//#simple-events
}

View file

@ -0,0 +1,179 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda.fsm;
import akka.actor.*;
import akka.testkit.JavaTestKit;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.Duration;
import static org.junit.Assert.*;
import static docs.actorlambda.fsm.FSMDocTest.StateType.*;
import static docs.actorlambda.fsm.FSMDocTest.Messages.*;
import static java.util.concurrent.TimeUnit.*;
public class FSMDocTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FSMDocTest");
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
public static enum StateType {
SomeState,
Processing,
Idle,
Active,
Error
}
public static enum Messages {
WillDo,
Tick
}
public static enum Data {
Foo,
Bar
};
public static interface X {};
public static class DummyFSM extends AbstractFSM<StateType, Integer> {
Integer newData = 42;
//#alt-transition-syntax
public void handler(StateType from, StateType to) {
// handle transition here
}
//#alt-transition-syntax
{
//#modifier-syntax
when(SomeState, matchAnyEvent((msg, data) -> {
return goTo(Processing).using(newData).
forMax(Duration.create(5, SECONDS)).replying(WillDo);
}));
//#modifier-syntax
//#NullFunction
when(SomeState, AbstractFSM.NullFunction());
//#NullFunction
//#transition-syntax
onTransition(
matchState(Active, Idle, () -> setTimer("timeout",
Tick, Duration.create(1, SECONDS), true)).
state(Active, null, () -> cancelTimer("timeout")).
state(null, Idle, (f, t) -> log().info("entering Idle from " + f)));
//#transition-syntax
//#alt-transition-syntax
onTransition(this::handler);
//#alt-transition-syntax
//#stop-syntax
when(Error, matchEventEquals("stop", (event, data) -> {
// do cleanup ...
return stop();
}));
//#stop-syntax
//#termination-syntax
onTermination(
matchStop(Normal(),
(state, data) -> {/* Do something here */}).
stop(Shutdown(),
(state, data) -> {/* Do something here */}).
stop(Failure.class,
(reason, state, data) -> {/* Do something here */}));
//#termination-syntax
//#unhandled-syntax
whenUnhandled(
matchEvent(X.class, (x, data) -> {
log().info("Received unhandled event: " + x);
return stay();
}).
anyEvent((event, data) -> {
log().warning("Received unknown event: " + event);
return goTo(Error);
}));
}
//#unhandled-syntax
}
static
//#logging-fsm
public class MyFSM extends AbstractLoggingFSM<StateType, Data> {
//#body-elided
//#logging-fsm
ActorRef target = null;
//#logging-fsm
@Override
public int logDepth() { return 12; }
{
onTermination(
matchStop(Failure.class, (reason, state, data) -> {
String lastEvents = getLog().mkString("\n\t");
log().warning("Failure in state " + state + " with data " + data + "\n" +
"Events leading up to this point:\n\t" + lastEvents);
//#logging-fsm
target.tell(reason.cause(), self());
target.tell(state, self());
target.tell(data, self());
target.tell(lastEvents, self());
//#logging-fsm
})
);
//...
//#logging-fsm
startWith(SomeState, Data.Foo);
when(SomeState, matchEvent(ActorRef.class, Data.class, (ref, data) -> {
target = ref;
target.tell("going active", self());
return goTo(Active);
}));
when(Active, matchEventEquals("stop", (event, data) -> {
target.tell("stopping", self());
return stop(new Failure("This is not the error you're looking for"));
}));
initialize();
//#logging-fsm
}
//#body-elided
}
//#logging-fsm
@Test
public void testLoggingFSM()
{
new JavaTestKit(system) {{
final ActorRef logger =
system.actorOf(Props.create(MyFSM.class));
final ActorRef probe = getRef();
logger.tell(probe, probe);
expectMsgEquals("going active");
logger.tell("stop", probe);
expectMsgEquals("stopping");
expectMsgEquals("This is not the error you're looking for");
expectMsgEquals(Active);
expectMsgEquals(Data.Foo);
String msg = expectMsgClass(String.class);
assertTrue(msg.startsWith("LogEntry(SomeState,Foo,Actor[akka://FSMDocTest/system/"));
}};
}
}

View file

@ -0,0 +1,467 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actorlambda.japi;
//#all
//#imports
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.event.LoggingReceive;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.duration.Duration;
import static akka.japi.Util.classTag;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import static docs.actorlambda.japi.FaultHandlingDocSample.WorkerApi.*;
import static docs.actorlambda.japi.FaultHandlingDocSample.CounterServiceApi.*;
import static docs.actorlambda.japi.FaultHandlingDocSample.CounterApi.*;
import static docs.actorlambda.japi.FaultHandlingDocSample.StorageApi.*;
//#imports
public class FaultHandlingDocSample {
/**
* Runs the sample
*/
public static void main(String[] args) {
Config config = ConfigFactory.parseString(
"akka.loglevel = \"DEBUG\"\n" +
"akka.actor.debug {\n" +
" receive = on\n" +
" lifecycle = on\n" +
"}\n");
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
ActorRef worker = system.actorOf(Props.create(Worker.class), "worker");
ActorRef listener = system.actorOf(Props.create(Listener.class), "listener");
// start the work and listen on progress
// note that the listener is used as sender of the tell,
// i.e. it will receive replies from the worker
worker.tell(Start, listener);
}
/**
* Listens on progress from the worker and shuts down the system when enough
* work has been done.
*/
public static class Listener extends AbstractLoggingActor {
@Override
public void preStart() {
// If we don't get any progress within 15 seconds then the service
// is unavailable
context().setReceiveTimeout(Duration.create("15 seconds"));
}
public Listener() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Progress.class, progress -> {
log().info("Current progress: {} %", progress.percent);
if (progress.percent >= 100.0) {
log().info("That's all, shutting down");
context().system().terminate();
}
}).
matchEquals(ReceiveTimeout.getInstance(), x -> {
// No progress within 15 seconds, ServiceUnavailable
log().error("Shutting down due to unavailable service");
context().system().terminate();
}).build(), context()
));
}
}
//#messages
public interface WorkerApi {
public static final Object Start = "Start";
public static final Object Do = "Do";
public static class Progress {
public final double percent;
public Progress(double percent) {
this.percent = percent;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), percent);
}
}
}
//#messages
/**
* Worker performs some work when it receives the Start message. It will
* continuously notify the sender of the Start message of current Progress.
* The Worker supervise the CounterService.
*/
public static class Worker extends AbstractLoggingActor {
final Timeout askTimeout = new Timeout(Duration.create(5, "seconds"));
// The sender of the initial Start message will continuously be notified
// about progress
ActorRef progressListener;
final ActorRef counterService = context().actorOf(
Props.create(CounterService.class), "counter");
final int totalCount = 51;
// Stop the CounterService child if it throws ServiceUnavailable
private static final SupervisorStrategy strategy =
new OneForOneStrategy(DeciderBuilder.
match(ServiceUnavailable.class, e -> stop()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public Worker() {
receive(LoggingReceive.create(ReceiveBuilder.
matchEquals(Start, x -> progressListener == null, x -> {
progressListener = sender();
context().system().scheduler().schedule(
Duration.Zero(), Duration.create(1, "second"), self(), Do,
context().dispatcher(), null
);
}).
matchEquals(Do, x -> {
counterService.tell(new Increment(1), self());
counterService.tell(new Increment(1), self());
counterService.tell(new Increment(1), self());
// Send current progress to the initial sender
pipe(ask(counterService, GetCurrentCount, askTimeout)
.mapTo(classTag(CurrentCount.class))
.map(new Mapper<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}, context().dispatcher()), context().dispatcher())
.to(progressListener);
}).build(), context())
);
}
}
//#messages
public interface CounterServiceApi {
public static final Object GetCurrentCount = "GetCurrentCount";
public static class CurrentCount {
public final String key;
public final long count;
public CurrentCount(String key, long count) {
this.key = key;
this.count = count;
}
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
}
}
public static class Increment {
public final long n;
public Increment(long n) {
this.n = n;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), n);
}
}
public static class ServiceUnavailable extends RuntimeException {
private static final long serialVersionUID = 1L;
public ServiceUnavailable(String msg) {
super(msg);
}
}
}
//#messages
/**
* Adds the value received in Increment message to a persistent counter.
* Replies with CurrentCount when it is asked for CurrentCount. CounterService
* supervise Storage and Counter.
*/
public static class CounterService extends AbstractLoggingActor {
// Reconnect message
static final Object Reconnect = "Reconnect";
private static class SenderMsgPair {
final ActorRef sender;
final Object msg;
SenderMsgPair(ActorRef sender, Object msg) {
this.msg = msg;
this.sender = sender;
}
}
final String key = self().path().name();
ActorRef storage;
ActorRef counter;
final List<SenderMsgPair> backlog = new ArrayList<>();
final int MAX_BACKLOG = 10000;
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
private static final SupervisorStrategy strategy =
new OneForOneStrategy(3, Duration.create("5 seconds"), DeciderBuilder.
match(StorageException.class, e -> restart()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void preStart() {
initStorage();
}
/**
* The child storage is restarted in case of failure, but after 3 restarts,
* and still failing it will be stopped. Better to back-off than
* continuously failing. When it has been stopped we will schedule a
* Reconnect after a delay. Watch the child so we receive Terminated message
* when it has been terminated.
*/
void initStorage() {
storage = context().watch(context().actorOf(
Props.create(Storage.class), "storage"));
// Tell the counter, if any, to use the new storage
if (counter != null)
counter.tell(new UseStorage(storage), self());
// We need the initial value to be able to operate
storage.tell(new Get(key), self());
}
public CounterService() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Entry.class, entry -> entry.key.equals(key) && counter == null, entry -> {
// Reply from Storage of the initial value, now we can create the Counter
final long value = entry.value;
counter = context().actorOf(Props.create(Counter.class, key, value));
// Tell the counter to use current storage
counter.tell(new UseStorage(storage), self());
// and send the buffered backlog to the counter
for (SenderMsgPair each : backlog) {
counter.tell(each.msg, each.sender);
}
backlog.clear();
}).
match(Increment.class, increment -> {
forwardOrPlaceInBacklog(increment);
}).
matchEquals(GetCurrentCount, gcc -> {
forwardOrPlaceInBacklog(gcc);
}).
match(Terminated.class, o -> {
// After 3 restarts the storage child is stopped.
// We receive Terminated because we watch the child, see initStorage.
storage = null;
// Tell the counter that there is no storage for the moment
counter.tell(new UseStorage(null), self());
// Try to re-establish storage after while
context().system().scheduler().scheduleOnce(
Duration.create(10, "seconds"), self(), Reconnect,
context().dispatcher(), null);
}).
matchEquals(Reconnect, o -> {
// Re-establish storage after the scheduled delay
initStorage();
}).build(), context())
);
}
void forwardOrPlaceInBacklog(Object msg) {
// We need the initial value from storage before we can start delegate to
// the counter. Before that we place the messages in a backlog, to be sent
// to the counter when it is initialized.
if (counter == null) {
if (backlog.size() >= MAX_BACKLOG)
throw new ServiceUnavailable("CounterService not available," +
" lack of initial value");
backlog.add(new SenderMsgPair(sender(), msg));
} else {
counter.forward(msg, context());
}
}
}
//#messages
public interface CounterApi {
public static class UseStorage {
public final ActorRef storage;
public UseStorage(ActorRef storage) {
this.storage = storage;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), storage);
}
}
}
//#messages
/**
* The in memory count variable that will send current value to the Storage,
* if there is any storage available at the moment.
*/
public static class Counter extends AbstractLoggingActor {
final String key;
long count;
ActorRef storage;
public Counter(String key, long initialValue) {
this.key = key;
this.count = initialValue;
receive(LoggingReceive.create(ReceiveBuilder.
match(UseStorage.class, useStorage -> {
storage = useStorage.storage;
storeCount();
}).
match(Increment.class, increment -> {
count += increment.n;
storeCount();
}).
matchEquals(GetCurrentCount, gcc -> {
sender().tell(new CurrentCount(key, count), self());
}).build(), context())
);
}
void storeCount() {
// Delegate dangerous work, to protect our valuable state.
// We can continue without storage.
if (storage != null) {
storage.tell(new Store(new Entry(key, count)), self());
}
}
}
//#messages
public interface StorageApi {
public static class Store {
public final Entry entry;
public Store(Entry entry) {
this.entry = entry;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), entry);
}
}
public static class Entry {
public final String key;
public final long value;
public Entry(String key, long value) {
this.key = key;
this.value = value;
}
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, value);
}
}
public static class Get {
public final String key;
public Get(String key) {
this.key = key;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), key);
}
}
public static class StorageException extends RuntimeException {
private static final long serialVersionUID = 1L;
public StorageException(String msg) {
super(msg);
}
}
}
//#messages
/**
* Saves key/value pairs to persistent storage when receiving Store message.
* Replies with current value when receiving Get message. Will throw
* StorageException if the underlying data store is out of order.
*/
public static class Storage extends AbstractLoggingActor {
final DummyDB db = DummyDB.instance;
public Storage() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Store.class, store -> {
db.save(store.entry.key, store.entry.value);
}).
match(Get.class, get -> {
Long value = db.load(get.key);
sender().tell(new Entry(get.key, value == null ?
Long.valueOf(0L) : value), self());
}).build(), context())
);
}
}
//#dummydb
public static class DummyDB {
public static final DummyDB instance = new DummyDB();
private final Map<String, Long> db = new HashMap<String, Long>();
private DummyDB() {
}
public synchronized void save(String key, Long value) throws StorageException {
if (11 <= value && value <= 14)
throw new StorageException("Simulated store failure " + value);
db.put(key, value);
}
public synchronized Long load(String key) throws StorageException {
return db.get(key);
}
}
//#dummydb
}
//#all

View file

@ -0,0 +1,125 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io;
//#imports
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.io.Inet;
import akka.io.Udp;
import akka.io.UdpMessage;
import akka.util.ByteString;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.StandardProtocolFamily;
import java.net.DatagramSocket;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
//#imports
public class JavaUdpMulticast {
//#inet6-protocol-family
public static class Inet6ProtocolFamily extends Inet.DatagramChannelCreator {
@Override
public DatagramChannel create() throws Exception {
return DatagramChannel.open(StandardProtocolFamily.INET6);
}
}
//#inet6-protocol-family
//#multicast-group
public static class MulticastGroup extends Inet.AbstractSocketOptionV2 {
private String address;
private String interf;
public MulticastGroup(String address, String interf) {
this.address = address;
this.interf = interf;
}
@Override
public void afterBind(DatagramSocket s) {
try {
InetAddress group = InetAddress.getByName(address);
NetworkInterface networkInterface = NetworkInterface.getByName(interf);
s.getChannel().join(group, networkInterface);
} catch (Exception ex) {
System.out.println("Unable to join multicast group.");
}
}
}
//#multicast-group
public static class Listener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
ActorRef sink;
public Listener(String iface, String group, Integer port, ActorRef sink) {
this.sink = sink;
//#bind
List<Inet.SocketOption> options = new ArrayList<>();
options.add(new Inet6ProtocolFamily());
options.add(new MulticastGroup(group, iface));
final ActorRef mgr = Udp.get(getContext().system()).getManager();
// listen for datagrams on this address
InetSocketAddress endpoint = new InetSocketAddress(port);
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());
//#bind
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.Bound) {
final Udp.Bound b = (Udp.Bound) msg;
log.info("Bound to {}", b.localAddress());
sink.tell(b, getSelf());
} else if (msg instanceof Udp.Received) {
final Udp.Received r = (Udp.Received) msg;
final String txt = r.data().decodeString("utf-8");
log.info("Received '{}' from {}", txt, r.sender());
sink.tell(txt, getSelf());
} else unhandled(msg);
}
}
public static class Sender extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
String iface;
String group;
Integer port;
String message;
public Sender(String iface, String group, Integer port, String msg) {
this.iface = iface;
this.group = group;
this.port = port;
this.message = msg;
List<Inet.SocketOption> options = new ArrayList<>();
options.add(new Inet6ProtocolFamily());
final ActorRef mgr = Udp.get(getContext().system()).getManager();
mgr.tell(UdpMessage.simpleSender(options), getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.SimpleSenderReady) {
InetSocketAddress remote = new InetSocketAddress(group + "%" + iface, port);
log.info("Sending message to " + remote);
getSender().tell(UdpMessage.send(ByteString.fromString(message), remote), getSelf());
} else unhandled(msg);
}
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.io.Udp;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.Random;
public class JavaUdpMulticastTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("JavaUdpMulticastTest");
}
@Test
public void testUdpMulticast() throws Exception {
new JavaTestKit(system) {{
NetworkInterface ipv6Iface = null;
for (Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); interfaces.hasMoreElements() && ipv6Iface == null;) {
NetworkInterface interf = interfaces.nextElement();
for (Enumeration<InetAddress> addresses = interf.getInetAddresses(); addresses.hasMoreElements() && ipv6Iface == null;) {
InetAddress address = addresses.nextElement();
if (address instanceof Inet6Address) {
ipv6Iface = interf;
}
}
}
// host assigned link local multicast address http://tools.ietf.org/html/rfc3307#section-4.3.2
// generate a random 32 bit multicast address with the high order bit set
final String randomAddress = Long.toHexString(((long) Math.abs(new Random().nextInt())) | (1L << 31)).toUpperCase();
final StringBuilder groupBuilder = new StringBuilder("FF02:");
for (int i = 0; i < 2; i += 1) {
groupBuilder.append(":");
groupBuilder.append(randomAddress.subSequence(i * 4, i * 4 + 4));
}
final String group = groupBuilder.toString();
final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface);
final String msg = "ohi";
final ActorRef sink = getRef();
final String iface = ipv6Iface.getName();
final ActorRef listener = system.actorOf(Props.create(JavaUdpMulticast.Listener.class, iface, group, port, sink));
expectMsgClass(Udp.Bound.class);
final ActorRef sender = system.actorOf(Props.create(JavaUdpMulticast.Sender.class, iface, group, port, msg));
expectMsgEquals(msg);
// unbind
system.stop(listener);
}};
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
}

View file

@ -95,12 +95,12 @@ To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreato
class which implements ``akka.io.Inet.SocketOption``. Provide custom logic
for opening a datagram channel by overriding :meth:`create` method.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#inet6-protocol-family
.. includecode:: code/docs/io/JavaUdpMulticast.java#inet6-protocol-family
Another socket option will be needed to join a multicast group.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#multicast-group
.. includecode:: code/docs/io/JavaUdpMulticast.java#multicast-group
Socket options must be provided to :meth:`UdpMessage.bind` command.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#bind
.. includecode:: code/docs/io/JavaUdpMulticast.java#bind

View file

@ -51,7 +51,7 @@ function there is a builder named ``ReceiveBuilder`` that you can use.
Here is an example:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java
.. includecode:: code/docs/actorlambda/MyActor.java
:include: imports,my-actor
Please note that the Akka Actor ``receive`` message loop is exhaustive, which
@ -80,8 +80,8 @@ creating an actor including associated deployment information (e.g. which
dispatcher to use, see more below). Here are some examples of how to create a
:class:`Props` instance.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#import-props
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#creating-props
.. includecode:: code/docs/actorlambda/ActorDocTest.java#import-props
.. includecode:: code/docs/actorlambda/ActorDocTest.java#creating-props
The second variant shows how to pass constructor arguments to the
:class:`Actor` being created, but it should only be used outside of actors as
@ -96,7 +96,7 @@ found.
Dangerous Variants
^^^^^^^^^^^^^^^^^^
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#creating-props-deprecated
.. includecode:: code/docs/actorlambda/ActorDocTest.java#creating-props-deprecated
This method is not recommended to be used within another actor because it
encourages to close over the enclosing scope, resulting in non-serializable
@ -128,14 +128,14 @@ associated with using the ``Props.create(...)`` method which takes a by-name
argument, since within a companion object the given code block will not retain
a reference to its enclosing scope:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#props-factory
.. includecode:: code/docs/actorlambda/ActorDocTest.java#props-factory
Another good practice is to declare what messages an Actor can receive
as close to the actor definition as possible (e.g. as static classes
inside the Actor or using other suitable class), which makes it easier to know
what it can receive.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#messages-in-companion
.. includecode:: code/docs/actorlambda/ActorDocTest.java#messages-in-companion
Creating Actors with Props
--------------------------
@ -144,14 +144,14 @@ Actors are created by passing a :class:`Props` instance into the
:meth:`actorOf` factory method which is available on :class:`ActorSystem` and
:class:`ActorContext`.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#import-actorRef
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#system-actorOf
.. includecode:: code/docs/actorlambda/ActorDocTest.java#import-actorRef
.. includecode:: code/docs/actorlambda/ActorDocTest.java#system-actorOf
Using the :class:`ActorSystem` will create top-level actors, supervised by the
actor systems provided guardian actor, while using an actors context will
create a child actor.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#context-actorOf
.. includecode:: code/docs/actorlambda/ActorDocTest.java#context-actorOf
:exclude: plus-some-behavior
It is recommended to create a hierarchy of children, grand-children and so on
@ -321,7 +321,7 @@ termination (see `Stopping Actors`_). This service is provided by the
Registering a monitor is easy:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#watch
.. includecode:: code/docs/actorlambda/ActorDocTest.java#watch
It should be noted that the :class:`Terminated` message is generated
independent of the order in which registration and termination occur.
@ -348,7 +348,7 @@ Start Hook
Right after starting the actor, its :meth:`preStart` method is invoked.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#preStart
.. includecode:: code/docs/actorlambda/ActorDocTest.java#preStart
This method is called when the actor is first created. During restarts it is
called by the default implementation of :meth:`postRestart`, which means that
@ -427,7 +427,7 @@ actors may look up other actors by specifying absolute or relative
paths—logical or physical—and receive back an :class:`ActorSelection` with the
result:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#selection-local
.. includecode:: code/docs/actorlambda/ActorDocTest.java#selection-local
.. note::
@ -453,7 +453,7 @@ structure, i.e. the supervisor.
The path elements of an actor selection may contain wildcard patterns allowing for
broadcasting of messages to that section:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#selection-wildcard
.. includecode:: code/docs/actorlambda/ActorDocTest.java#selection-wildcard
Messages can be sent via the :class:`ActorSelection` and the path of the
:class:`ActorSelection` is looked up when delivering each message. If the selection
@ -469,8 +469,8 @@ actors which are traversed in the sense that if a concrete name lookup fails
negative result is generated. Please note that this does not mean that delivery
of that reply is guaranteed, it still is a normal message.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#import-identify
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#identify
.. includecode:: code/docs/actorlambda/ActorDocTest.java#import-identify
.. includecode:: code/docs/actorlambda/ActorDocTest.java#identify
You can also acquire an :class:`ActorRef` for an :class:`ActorSelection` with
the ``resolveOne`` method of the :class:`ActorSelection`. It returns a ``Future``
@ -480,7 +480,7 @@ didn't complete within the supplied `timeout`.
Remote actor addresses may also be looked up, if :ref:`remoting <remoting-java>` is enabled:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#selection-remote
.. includecode:: code/docs/actorlambda/ActorDocTest.java#selection-remote
An example demonstrating actor look-up is given in :ref:`remote-sample-java`.
@ -537,7 +537,7 @@ Tell: Fire-forget
This is the preferred way of sending messages. No blocking waiting for a
message. This gives the best concurrency and scalability characteristics.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#tell
.. includecode:: code/docs/actorlambda/ActorDocTest.java#tell
The sender reference is passed along with the message and available within the
receiving actor via its :meth:`sender()` method while processing this
@ -577,7 +577,7 @@ more below.
To complete the future with an exception you need send a Failure message to the sender.
This is *not done automatically* when an actor throws an exception while processing a message.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#reply-exception
.. includecode:: code/docs/actorlambda/ActorDocTest.java#reply-exception
If the actor does not complete the future, it will expire after the timeout period,
specified as parameter to the ``ask`` method; this will complete the
@ -608,7 +608,7 @@ original sender address/reference is maintained even though the message is going
through a 'mediator'. This can be useful when writing actors that work as
routers, load-balancers, replicators etc.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#forward
.. includecode:: code/docs/actorlambda/ActorDocTest.java#forward
Receive messages
================
@ -616,13 +616,13 @@ Receive messages
An Actor either has to set its initial receive behavior in the constructor by
calling the :meth:`receive` method in the :class:`AbstractActor`:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java
.. includecode:: code/docs/actorlambda/ActorDocTest.java
:include: receive-constructor
:exclude: and-some-behavior
or by implementing the :meth:`receive` method in the :class:`Actor` interface:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#receive
.. includecode:: code/docs/actorlambda/ActorDocTest.java#receive
Both the argument to the :class:`AbstractActor` :meth:`receive` method and the return
type of the :class:`Actor` :meth:`receive` method is a ``PartialFunction<Object, BoxedUnit>``
@ -634,7 +634,7 @@ function there is a builder named ``ReceiveBuilder`` that you can use.
Here is an example:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java
.. includecode:: code/docs/actorlambda/MyActor.java
:include: imports,my-actor
.. _LambdaActor.Reply:
@ -649,7 +649,7 @@ for replying later, or passing on to other actors. If there is no sender (a
message was sent without an actor or future context) then the sender
defaults to a 'dead-letter' actor ref.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java#reply
.. includecode:: code/docs/actorlambda/MyActor.java#reply
Receive timeout
@ -667,7 +667,7 @@ timeout there must have been an idle period beforehand as configured via this me
Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity
periods). Pass in `Duration.Undefined` to switch off this feature.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#receive-timeout
.. includecode:: code/docs/actorlambda/ActorDocTest.java#receive-timeout
.. _stopping-actors-lambda:
@ -704,7 +704,7 @@ whole system.
The :meth:`postStop()` hook is invoked after an actor is fully stopped. This
enables cleaning up of resources:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#postStop
.. includecode:: code/docs/actorlambda/ActorDocTest.java#postStop
:exclude: clean-up-some-resources
.. note::
@ -735,7 +735,7 @@ termination of several actors:
.. includecode:: code/docs/actor/UntypedActorDocTest.java#gracefulStop
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#gracefulStop-actor
.. includecode:: code/docs/actorlambda/ActorDocTest.java#gracefulStop-actor
When ``gracefulStop()`` returns successfully, the actors ``postStop()`` hook
will have been executed: there exists a happens-before edge between the end of
@ -775,7 +775,7 @@ popped.
To hotswap the Actor behavior using ``become``:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#hot-swap-actor
.. includecode:: code/docs/actorlambda/ActorDocTest.java#hot-swap-actor
This variant of the :meth:`become` method is useful for many different things,
such as to implement a Finite State Machine (FSM, for an example see `Dining
@ -791,7 +791,7 @@ of “pop” operations (i.e. :meth:`unbecome`) matches the number of “push”
in the long run, otherwise this amounts to a memory leak (which is why this
behavior is not the default).
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#swapper
.. includecode:: code/docs/actorlambda/ActorDocTest.java#swapper
Stash
@ -816,7 +816,7 @@ order as they have been received originally. An actor that extends
Here is an example of the ``AbstractActorWithStash`` class in action:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#stash
.. includecode:: code/docs/actorlambda/ActorDocTest.java#stash
Invoking ``stash()`` adds the current message (the message that the
actor received last) to the actor's stash. It is typically invoked
@ -949,7 +949,7 @@ for example in the presence of circular dependencies. In this case the actor sho
and use ``become()`` or a finite state-machine state transition to encode the initialized and uninitialized states
of the actor.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java#messageInit
.. includecode:: code/docs/actorlambda/InitializationDocTest.java#messageInit
If the actor may receive messages before it has been initialized, a useful tool can be the ``Stash`` to save messages
until the initialization finishes, and replaying them after the actor became initialized.

View file

@ -49,5 +49,5 @@ Step Description
Full Source Code of the Fault Tolerance Sample
------------------------------------------------------
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java#all
.. includecode:: code/docs/actorlambda/japi/FaultHandlingDocSample.java#all

View file

@ -32,7 +32,7 @@ in more depth.
For the sake of demonstration let us consider the following strategy:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: strategy
I have chosen a few well-known exception types in order to demonstrate the
@ -109,49 +109,49 @@ Test Application
The following section shows the effects of the different directives in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: supervisor
This supervisor will be used to create a child, with which we can experiment:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: child
The test is easier by using the utilities described in :ref:`akka-testkit`,
where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: testkit
Let us create actors:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: create
The first test shall demonstrate the ``Resume`` directive, so we try it out by
setting some non-initial state in the actor and have it fail:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: resume
As you can see the value 42 survives the fault handling directive. Now, if we
change the failure to a more serious ``NullPointerException``, that will no
longer be the case:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: restart
And finally in case of the fatal ``IllegalArgumentException`` the child will be
terminated by the supervisor:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: stop
Up to now the supervisor was completely unaffected by the childs failure,
because the directives set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: escalate-kill
The supervisor itself is supervised by the top-level actor provided by the
@ -164,12 +164,12 @@ child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a
different supervisor which overrides this behavior.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: supervisor2
With this parent, the child survives the escalated restart, as demonstrated in
the last test:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
.. includecode:: code/docs/actorlambda/FaultHandlingTest.java
:include: escalate-restart

View file

@ -38,11 +38,11 @@ send them on after the burst ended or a flush request is received.
First, consider all of the below to use these import statements:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java#simple-imports
.. includecode:: code/docs/actorlambda/fsm/Buncher.java#simple-imports
The contract of our “Buncher” actor is that it accepts or produces the following messages:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Events.java
.. includecode:: code/docs/actorlambda/fsm/Events.java
:include: simple-events
:exclude: boilerplate
@ -53,7 +53,7 @@ The contract of our “Buncher” actor is that it accepts or produces the follo
The actor can be in two states: no message queued (aka ``Idle``) or some
message queued (aka ``Active``). The states and the state data is defined like this:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
.. includecode:: code/docs/actorlambda/fsm/Buncher.java
:include: simple-state
:exclude: boilerplate
@ -64,7 +64,7 @@ reference to send the batches to and the actual queue of messages.
Now lets take a look at the skeleton for our FSM actor:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
.. includecode:: code/docs/actorlambda/fsm/Buncher.java
:include: simple-fsm
:exclude: transition-elided,unhandled-elided
@ -93,7 +93,7 @@ shall work identically in both states, we make use of the fact that any event
which is not handled by the ``when()`` block is passed to the
``whenUnhandled()`` block:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java#unhandled-elided
.. includecode:: code/docs/actorlambda/fsm/Buncher.java#unhandled-elided
The first case handled here is adding ``Queue()`` requests to the internal
queue and going to the ``Active`` state (this does the obvious thing of staying
@ -107,7 +107,7 @@ target, for which we use the ``onTransition`` mechanism: you can declare
multiple such blocks and all of them will be tried for matching behavior in
case a state transition occurs (i.e. only when the state actually changes).
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java#transition-elided
.. includecode:: code/docs/actorlambda/fsm/Buncher.java#transition-elided
The transition callback is a partial function which takes as input a pair of
states—the current and the next state. During the state change, the old state
@ -117,7 +117,7 @@ available as ``nextStateData``.
To verify that this buncher actually works, it is quite easy to write a test
using the :ref:`akka-testkit`, here using JUnit as an example:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/BuncherTest.java
.. includecode:: code/docs/actorlambda/fsm/BuncherTest.java
:include: test-code
Reference
@ -129,7 +129,7 @@ The AbstractFSM Class
The :class:`AbstractFSM` abstract class is the base class used to implement an FSM. It implements
Actor since an Actor is created to drive the FSM.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
.. includecode:: code/docs/actorlambda/fsm/Buncher.java
:include: simple-fsm
:exclude: fsm-body
@ -181,7 +181,7 @@ The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`,
which is conveniently given using the state function builder syntax as
demonstrated below:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
.. includecode:: code/docs/actorlambda/fsm/Buncher.java
:include: when-syntax
.. warning::
@ -193,7 +193,7 @@ It is recommended practice to declare the states as an enum and then verify that
``when`` clause for each of the states. If you want to leave the handling of a state
“unhandled” (more below), it still needs to be declared like this:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java#NullFunction
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java#NullFunction
Defining the Initial State
--------------------------
@ -213,7 +213,7 @@ If a state doesn't handle a received event a warning is logged. If you want to
do something else in this case you can specify that with
:func:`whenUnhandled(stateFunction)`:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: unhandled-syntax
Within this handler the state of the FSM may be queried using the
@ -257,7 +257,7 @@ of the modifiers described in the following:
All modifiers can be chained to achieve a nice and concise description:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: modifier-syntax
The parentheses are not actually needed in all cases, but they visually
@ -294,14 +294,14 @@ The handler is a partial function which takes a pair of states as input; no
resulting state is needed as it is not possible to modify the transition in
progress.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: transition-syntax
It is also possible to pass a function object accepting two states to
:func:`onTransition`, in case your transition handling logic is implemented as
a method:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: alt-transition-syntax
The handlers registered with this method are stacked, so you can intersperse
@ -377,14 +377,14 @@ state data which is available during termination handling.
the same way as a state transition (but note that the ``return`` statement
may not be used within a :meth:`when` block).
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: stop-syntax
You can use :func:`onTermination(handler)` to specify custom code that is
executed when the FSM is stopped. The handler is a partial function which takes
a :class:`StopEvent(reason, stateName, stateData)` as argument:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: termination-syntax
As for the :func:`whenUnhandled` case, this handler is not stacked, so each
@ -418,7 +418,7 @@ Event Tracing
The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: logging-fsm
:exclude: body-elided
@ -439,7 +439,7 @@ The :class:`AbstractLoggingFSM` class adds one more feature to the FSM: a rollin
log which may be used during debugging (for tracing how the FSM entered a
certain failure state) or for other creative uses:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
.. includecode:: code/docs/actorlambda/fsm/FSMDocTest.java
:include: logging-fsm
The :meth:`logDepth` defaults to zero, which turns off the event log.