improve AbstractActor, #21717

* Receive class that wraps PartialFunction, to avoid
  scary scala types
* move AbstractActorContext to AbstractActor.ActorContext
* converting docs, many, many UntypedActor
* removing UntypedActor docs
* add unit test for ReceiveBuilder
* MiMa filters
* consistent use of getContext(), self(), sender()
* rename cross references
* migration guide
* skip samples for now
* improve match type safetyi, add matchUnchecked
  * the `? extends P` caused code like this to compile:
    `match(String.class, (Integer i) -> {})`
  * added matchUnchecked, since it can still be useful (um, convenient)
    to be able to do:
    `matchUnchecked(List.class, (List<String> list) -> {})`
* eleminate some scala.Option
  * preRestart
  * findChild
  * ActorIdentity.getActorRef
This commit is contained in:
Patrik Nordwall 2016-12-13 10:59:29 +01:00
parent 3617fe8b41
commit 4bd6b7aab1
157 changed files with 3290 additions and 8882 deletions

View file

@ -12,19 +12,23 @@ import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import docs.actor.ActorDocTest.FirstActor;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actorlambda.Messages.Swap.Swap;
import static docs.actorlambda.Messages.*;
import static akka.japi.Util.immutableSeq;
import akka.actor.CoordinatedShutdown;
import static akka.pattern.PatternsCS.ask;
import akka.util.Timeout;
import akka.Done;
import java.util.concurrent.CompletionStage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import akka.testkit.TestActors;
import akka.dispatch.Mapper;
import akka.dispatch.Futures;
import akka.util.Timeout;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
@ -44,13 +48,25 @@ import akka.actor.ActorIdentity;
import akka.actor.ActorSelection;
import akka.actor.Identify;
//#import-identify
//#import-graceFulStop
//#import-ask
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.util.Timeout;
//#import-ask
//#import-gracefulStop
import akka.pattern.AskTimeoutException;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
//#import-ask
import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-graceFulStop
//#import-ask
//#import-gracefulStop
//#import-terminated
import akka.actor.Terminated;
//#import-terminated
public class ActorDocTest extends AbstractJavaTest {
@ -71,49 +87,111 @@ public class ActorDocTest extends AbstractJavaTest {
@AfterClass
public static void afterClass() throws Exception {
Await.ready(system.terminate(), Duration.create("5 seconds"));
Await.ready(system.terminate(), Duration.create(5, TimeUnit.SECONDS));
}
static
//#context-actorOf
public class FirstActor extends AbstractActor {
final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild");
final ActorRef child = getContext().actorOf(Props.create(MyActor.class), "myChild");
//#plus-some-behavior
public FirstActor() {
receive(ReceiveBuilder.
matchAny(x -> {
sender().tell(x, self());
}).build()
);
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(x -> sender().tell(x, self()))
.build();
}
//#plus-some-behavior
}
//#context-actorOf
static public abstract class SomeActor extends AbstractActor {
//#receive-constructor
public SomeActor() {
receive(ReceiveBuilder.
//#and-some-behavior
match(String.class, s -> System.out.println(s.toLowerCase())).
//#and-some-behavior
build());
}
//#receive-constructor
static public class SomeActor extends AbstractActor {
//#createReceive
@Override
//#receive
public abstract PartialFunction<Object, BoxedUnit> receive();
//#receive
public Receive createReceive() {
return receiveBuilder()
.match(String.class, s -> System.out.println(s.toLowerCase()))
.build();
}
//#createReceive
}
static
//#well-structured
public class WellStructuredActor extends AbstractActor {
public static class Msg1 {}
public static class Msg2 {}
public static class Msg3 {}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Msg1.class, this::receiveMsg1)
.match(Msg2.class, this::receiveMsg2)
.match(Msg3.class, this::receiveMsg3)
.build();
}
private void receiveMsg1(Msg1 msg) {
// actual work
}
private void receiveMsg2(Msg2 msg) {
// actual work
}
private void receiveMsg3(Msg3 msg) {
// actual work
}
}
//#well-structured
static
//#optimized
public class OptimizedActor extends UntypedAbstractActor {
public static class Msg1 {}
public static class Msg2 {}
public static class Msg3 {}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof Msg1)
receiveMsg1((Msg1) msg);
else if (msg instanceof Msg1)
receiveMsg2((Msg2) msg);
else if (msg instanceof Msg3)
receiveMsg3((Msg3) msg);
else
unhandled(msg);
}
private void receiveMsg1(Msg1 msg) {
// actual work
}
private void receiveMsg2(Msg2 msg) {
// actual work
}
private void receiveMsg3(Msg3 msg) {
// actual work
}
}
//#optimized
static public class ActorWithArgs extends AbstractActor {
private final String args;
ActorWithArgs(String args) {
public ActorWithArgs(String args) {
this.args = args;
receive(ReceiveBuilder.
matchAny(x -> { }).build()
);
}
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(x -> { }).build();
}
}
@ -133,14 +211,18 @@ public class ActorDocTest extends AbstractJavaTest {
}
private final Integer magicNumber;
DemoActor(Integer magicNumber) {
public DemoActor(Integer magicNumber) {
this.magicNumber = magicNumber;
receive(ReceiveBuilder.
match(Integer.class, i -> {
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, i -> {
sender().tell(i + magicNumber, self());
}).build()
);
})
.build();
}
}
@ -149,11 +231,12 @@ public class ActorDocTest extends AbstractJavaTest {
//#props-factory
public class SomeOtherActor extends AbstractActor {
// Props(new DemoActor(42)) would not be safe
ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo");
ActorRef demoActor = getContext().actorOf(DemoActor.props(42), "demo");
// ...
//#props-factory
public SomeOtherActor() {
receive(emptyBehavior());
@Override
public Receive createReceive() {
return AbstractActor.emptyBehavior();
}
//#props-factory
}
@ -174,26 +257,65 @@ public class ActorDocTest extends AbstractJavaTest {
return from;
}
}
DemoMessagesActor() {
receive(ReceiveBuilder.
match(Greeting.class, g -> {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Greeting.class, g -> {
log().info("I was greeted by {}", g.getGreeter());
}).build()
);
};
})
.build();
}
}
//#messages-in-companion
public static class LifecycleMethods extends AbstractActor {
@Override
public Receive createReceive() {
return AbstractActor.emptyBehavior();
}
/*
* This section must be kept in sync with the actual Actor trait.
*
* BOYSCOUT RULE: whenever you read this, verify that!
*/
//#lifecycle-callbacks
public void preStart() {
}
public void preRestart(Throwable reason, Optional<Object> message) {
for (ActorRef each : getContext().getChildren()) {
getContext().unwatch(each);
getContext().stop(each);
}
postStop();
}
public void postRestart(Throwable reason) {
preStart();
}
public void postStop() {
}
//#lifecycle-callbacks
}
public static class Hook extends AbstractActor {
ActorRef target = null;
public Hook() {
receive(emptyBehavior());
@Override
public Receive createReceive() {
return AbstractActor.emptyBehavior();
}
//#preStart
@Override
public void preStart() {
target = context().actorOf(Props.create(MyActor.class, "target"));
target = getContext().actorOf(Props.create(MyActor.class, "target"));
}
//#preStart
//#postStop
@ -207,7 +329,7 @@ public class ActorDocTest extends AbstractJavaTest {
//#tell
final Object result = "";
//#forward
target.forward(result, context());
target.forward(result, getContext());
//#forward
target = null;
//#clean-up-some-resources
@ -218,28 +340,30 @@ public class ActorDocTest extends AbstractJavaTest {
public void compileSelections() {
//#selection-local
// will look up this absolute path
context().actorSelection("/user/serviceA/actor");
getContext().actorSelection("/user/serviceA/actor");
// will look up sibling beneath same supervisor
context().actorSelection("../joe");
getContext().actorSelection("../joe");
//#selection-local
//#selection-wildcard
// will look all children to serviceB with names starting with worker
context().actorSelection("/user/serviceB/worker*");
getContext().actorSelection("/user/serviceB/worker*");
// will look up all siblings beneath same supervisor
context().actorSelection("../*");
getContext().actorSelection("../*");
//#selection-wildcard
//#selection-remote
context().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");
getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");
//#selection-remote
}
}
public static class ReplyException extends AbstractActor {
public ReplyException() {
receive(ReceiveBuilder.
matchAny(x -> {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(x -> {
//#reply-exception
try {
String result = operation();
@ -249,8 +373,8 @@ public class ActorDocTest extends AbstractJavaTest {
throw e;
}
//#reply-exception
}).build()
);
})
.build();
}
private String operation() {
@ -267,28 +391,31 @@ public class ActorDocTest extends AbstractJavaTest {
public static final Shutdown SHUTDOWN = Shutdown.Shutdown;
private ActorRef worker =
context().watch(context().actorOf(Props.create(Cruncher.class), "worker"));
getContext().watch(getContext().actorOf(Props.create(Cruncher.class), "worker"));
public Manager() {
receive(ReceiveBuilder.
matchEquals("job", s -> {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("job", s -> {
worker.tell("crunch", self());
}).
matchEquals(SHUTDOWN, x -> {
})
.matchEquals(SHUTDOWN, x -> {
worker.tell(PoisonPill.getInstance(), self());
context().become(shuttingDown);
}).build()
);
getContext().become(shuttingDown());
})
.build();
}
public PartialFunction<Object, BoxedUnit> shuttingDown =
ReceiveBuilder.
matchEquals("job", s -> {
sender().tell("service unavailable, shutting down", self());
}).
match(Terminated.class, t -> t.actor().equals(worker), t -> {
context().stop(self());
}).build();
private AbstractActor.Receive shuttingDown() {
return receiveBuilder()
.matchEquals("job", s ->
sender().tell("service unavailable, shutting down", self())
)
.match(Terminated.class, t -> t.actor().equals(worker), t ->
getContext().stop(self())
)
.build();
}
}
//#gracefulStop-actor
@ -309,27 +436,26 @@ public class ActorDocTest extends AbstractJavaTest {
public static class Cruncher extends AbstractActor {
public Cruncher() {
receive(ReceiveBuilder.
matchEquals("crunch", s -> { }).build()
);
@Override
public Receive createReceive() {
return receiveBuilder().matchEquals("crunch", s -> { }).build();
}
}
static
//#swapper
public class Swapper extends AbstractLoggingActor {
public Swapper() {
receive(ReceiveBuilder.
matchEquals(Swap, s -> {
log().info("Hi");
context().become(ReceiveBuilder.
matchEquals(Swap, x -> {
log().info("Ho");
context().unbecome(); // resets the latest 'become' (just for fun)
}).build(), false); // push on top instead of replace
}).build()
);
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(Swap, s -> {
log().info("Hi");
getContext().become(receiveBuilder().
matchEquals(Swap, x -> {
log().info("Ho");
getContext().unbecome(); // resets the latest 'become' (just for fun)
}).build(), false); // push on top instead of replace
}).build();
}
}
@ -418,29 +544,32 @@ public class ActorDocTest extends AbstractJavaTest {
//#receive-timeout
public class ReceiveTimeoutActor extends AbstractActor {
//#receive-timeout
ActorRef target = context().system().deadLetters();
ActorRef target = getContext().system().deadLetters();
//#receive-timeout
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
receive(ReceiveBuilder.
matchEquals("Hello", s -> {
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("Hello", s -> {
// To set in a response to a message
context().setReceiveTimeout(Duration.create("1 second"));
getContext().setReceiveTimeout(Duration.create(1, TimeUnit.SECONDS));
//#receive-timeout
target = sender();
target.tell("Hello world", self());
//#receive-timeout
}).
match(ReceiveTimeout.class, r -> {
})
.match(ReceiveTimeout.class, r -> {
// To turn it off
context().setReceiveTimeout(Duration.Undefined());
getContext().setReceiveTimeout(Duration.Undefined());
//#receive-timeout
target.tell("timeout", self());
//#receive-timeout
}).build()
);
})
.build();
}
}
//#receive-timeout
@ -460,35 +589,40 @@ public class ActorDocTest extends AbstractJavaTest {
static
//#hot-swap-actor
public class HotSwapActor extends AbstractActor {
private PartialFunction<Object, BoxedUnit> angry;
private PartialFunction<Object, BoxedUnit> happy;
private AbstractActor.Receive angry;
private AbstractActor.Receive happy;
public HotSwapActor() {
angry =
ReceiveBuilder.
matchEquals("foo", s -> {
receiveBuilder()
.matchEquals("foo", s -> {
sender().tell("I am already angry?", self());
}).
matchEquals("bar", s -> {
context().become(happy);
}).build();
})
.matchEquals("bar", s -> {
getContext().become(happy);
})
.build();
happy = ReceiveBuilder.
matchEquals("bar", s -> {
happy = receiveBuilder()
.matchEquals("bar", s -> {
sender().tell("I am already happy :-)", self());
}).
matchEquals("foo", s -> {
context().become(angry);
}).build();
receive(ReceiveBuilder.
matchEquals("foo", s -> {
context().become(angry);
}).
matchEquals("bar", s -> {
context().become(happy);
}).build()
);
})
.matchEquals("foo", s -> {
getContext().become(angry);
})
.build();
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("foo", s ->
getContext().become(angry)
)
.matchEquals("bar", s ->
getContext().become(happy)
)
.build();
}
}
//#hot-swap-actor
@ -516,19 +650,21 @@ public class ActorDocTest extends AbstractJavaTest {
static
//#stash
public class ActorWithProtocol extends AbstractActorWithStash {
public ActorWithProtocol() {
receive(ReceiveBuilder.
matchEquals("open", s -> {
context().become(ReceiveBuilder.
matchEquals("write", ws -> { /* do writing */ }).
matchEquals("close", cs -> {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("open", s -> {
getContext().become(receiveBuilder()
.matchEquals("write", ws -> { /* do writing */ })
.matchEquals("close", cs -> {
unstashAll();
context().unbecome();
}).
matchAny(msg -> stash()).build(), false);
}).
matchAny(msg -> stash()).build()
);
getContext().unbecome();
})
.matchAny(msg -> stash())
.build(), false);
})
.matchAny(msg -> stash())
.build();
}
}
//#stash
@ -541,21 +677,24 @@ public class ActorDocTest extends AbstractJavaTest {
static
//#watch
public class WatchActor extends AbstractActor {
private final ActorRef child = context().actorOf(Props.empty(), "target");
private final ActorRef child = getContext().actorOf(Props.empty(), "target");
private ActorRef lastSender = system.deadLetters();
public WatchActor() {
context().watch(child); // <-- this is the only call needed for registration
receive(ReceiveBuilder.
matchEquals("kill", s -> {
context().stop(child);
getContext().watch(child); // <-- this is the only call needed for registration
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("kill", s -> {
getContext().stop(child);
lastSender = sender();
}).
match(Terminated.class, t -> t.actor().equals(child), t -> {
})
.match(Terminated.class, t -> t.actor().equals(child), t -> {
lastSender.tell("finished", self());
}).build()
);
})
.build();
}
}
//#watch
@ -578,26 +717,30 @@ public class ActorDocTest extends AbstractJavaTest {
final Integer identifyId = 1;
public Follower(){
ActorSelection selection = context().actorSelection("/user/another");
ActorSelection selection = getContext().actorSelection("/user/another");
selection.tell(new Identify(identifyId), self());
receive(ReceiveBuilder.
match(ActorIdentity.class, id -> id.getRef() != null, id -> {
ActorRef ref = id.getRef();
context().watch(ref);
context().become(active(ref));
}).
match(ActorIdentity.class, id -> id.getRef() == null, id -> {
context().stop(self());
}).build()
);
}
final PartialFunction<Object, BoxedUnit> active(final ActorRef another) {
return ReceiveBuilder.
match(Terminated.class, t -> t.actor().equals(another), t -> {
context().stop(self());
}).build();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ActorIdentity.class, id -> id.getActorRef().isPresent(), id -> {
ActorRef ref = id.getActorRef().get();
getContext().watch(ref);
getContext().become(active(ref));
})
.match(ActorIdentity.class, id -> !id.getActorRef().isPresent(), id -> {
getContext().stop(self());
})
.build();
}
final AbstractActor.Receive active(final ActorRef another) {
return receiveBuilder()
.match(Terminated.class, t -> t.actor().equals(another), t ->
getContext().stop(self())
)
.build();
}
}
//#identify
@ -615,73 +758,70 @@ public class ActorDocTest extends AbstractJavaTest {
}
};
}
public static class NoReceiveActor extends AbstractActor {
}
@Test
public void noReceiveActor() {
EventFilter ex1 = new ErrorFilter(ActorInitializationException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
final ActorRef victim = new EventFilter<ActorRef>(ActorInitializationException.class) {
protected ActorRef run() {
return system.actorOf(Props.create(NoReceiveActor.class), "victim");
}
}.message("Actor behavior has not been set with receive(...)").occurrences(1).exec();
public void usePatternsAskPipe() {
new JavaTestKit(system) {
{
ActorRef actorA = system.actorOf(TestActors.echoActorProps());
ActorRef actorB = system.actorOf(TestActors.echoActorProps());
ActorRef actorC = getRef();
assertEquals(true, victim.isTerminated());
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
//#ask-pipe
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout
futures.add(ask(actorB, "another request", t)); // using timeout from
// above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures,
system.dispatcher());
final Future<Result> transformed = aggregate.map(
new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String x = (String) it.next();
final String s = (String) it.next();
return new Result(x, s);
}
}, system.dispatcher());
pipe(transformed, system.dispatcher()).to(actorC);
//#ask-pipe
expectMsgEquals(new Result("request", "another request"));
}
};
}
public static class MultipleReceiveActor extends AbstractActor {
public MultipleReceiveActor() {
receive(ReceiveBuilder.
match(String.class, s1 -> s1.toLowerCase().equals("become"), s1 -> {
sender().tell(s1.toUpperCase(), self());
receive(ReceiveBuilder.
match(String.class, s2 -> {
sender().tell(s2.toLowerCase(), self());
}).build()
);
}).
match(String.class, s1 -> {
sender().tell(s1.toUpperCase(), self());
}).build()
);
}
}
@Test
public void multipleReceiveActor() {
EventFilter ex1 = new ErrorFilter(IllegalActorStateException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
new EventFilter<Boolean>(IllegalActorStateException.class) {
protected Boolean run() {
ActorRef victim = system.actorOf(Props.create(MultipleReceiveActor.class), "victim2");
victim.tell("Foo", getRef());
expectMsgEquals("FOO");
victim.tell("bEcoMe", getRef());
expectMsgEquals("BECOME");
victim.tell("Foo", getRef());
// if it's upper case, then the actor was restarted
expectMsgEquals("FOO");
return true;
}
}.message("Actor behavior has already been set with receive(...), " +
"use context().become(...) to change it later").occurrences(1).exec();
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
public void useKill() {
new JavaTestKit(system) {
{
ActorRef victim = system.actorOf(TestActors.echoActorProps());
watch(victim);
//#kill
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());
//#kill
expectTerminated(Duration.create(3, TimeUnit.SECONDS), victim);
}
};
}
@Test
public void usePoisonPill() {
new JavaTestKit(system) {
{
ActorRef victim = system.actorOf(TestActors.echoActorProps());
watch(victim);
//#poison-pill
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
//#poison-pill
expectTerminated(Duration.create(3, TimeUnit.SECONDS), victim);
}
};
}
@Test
@ -691,7 +831,7 @@ public class ActorDocTest extends AbstractJavaTest {
CoordinatedShutdown.get(system).addTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskName",
() -> {
return ask(someActor, "stop", new Timeout(5, TimeUnit.SECONDS))
return akka.pattern.PatternsCS.ask(someActor, "stop", new Timeout(5, TimeUnit.SECONDS))
.thenApply(reply -> Done.getInstance());
});
//#coordinated-shutdown-addTask