+act,sam,doc #3940 Added receive setter for Java Lambda actors

* Added a setter for Java lambda actors to "hide" the not so nice looking type signature of the "receive" method.
* Updated docs to reflect the changes.
* Converted samples to use the new setter.
This commit is contained in:
Björn Antonsson 2014-03-20 12:05:32 +01:00
parent 30ae25a90c
commit 723c931d16
23 changed files with 683 additions and 570 deletions

View file

@ -8,10 +8,16 @@ import akka.actor.*;
import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actor.Messages.Swap.Swap;
import static docs.actor.Messages.*;
import static akka.japi.Util.immutableSeq;
import java.util.concurrent.TimeUnit;
@ -43,11 +49,19 @@ import static akka.pattern.Patterns.gracefulStop;
public class ActorDocTest {
public static Config config = ConfigFactory.parseString(
"akka {\n" +
" loggers = [\"akka.testkit.TestEventListener\"]\n" +
" loglevel = \"WARNING\"\n" +
" stdout-loglevel = \"WARNING\"\n" +
"}\n"
);
static ActorSystem system = null;
@BeforeClass
public static void beforeClass() {
system = ActorSystem.create("ActorDocTest");
system = ActorSystem.create("ActorDocTest", config);
}
@AfterClass
@ -61,18 +75,27 @@ public class ActorDocTest {
public class FirstActor extends AbstractActor {
final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild");
//#plus-some-behavior
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public FirstActor() {
receive(ReceiveBuilder.
matchAny(x -> {
sender().tell(x, self());
}).build();
}).build()
);
}
//#plus-some-behavior
}
//#context-actorOf
static public abstract class ReceiveActor extends AbstractActor {
static public abstract class SomeActor extends AbstractActor {
//#receive-constructor
public SomeActor() {
receive(ReceiveBuilder.
//#and-some-behavior
match(String.class, s -> System.out.println(s.toLowerCase())).
//#and-some-behavior
build());
}
//#receive-constructor
@Override
//#receive
public abstract PartialFunction<Object, BoxedUnit> receive();
@ -84,12 +107,9 @@ public class ActorDocTest {
ActorWithArgs(String args) {
this.args = args;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchAny(x -> { }).build();
receive(ReceiveBuilder.
matchAny(x -> { }).build()
);
}
}
@ -112,14 +132,11 @@ public class ActorDocTest {
DemoActor(Integer magicNumber) {
this.magicNumber = magicNumber;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
match(Integer.class, i -> {
sender().tell(i + magicNumber, self());
}).build();
}).build()
);
}
}
@ -131,9 +148,8 @@ public class ActorDocTest {
ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo");
// ...
//#props-factory
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return emptyBehavior();
public SomeOtherActor() {
receive(emptyBehavior());
}
//#props-factory
}
@ -141,15 +157,15 @@ public class ActorDocTest {
public static class Hook extends AbstractActor {
ActorRef target = null;
public Hook() {
receive(emptyBehavior());
}
//#preStart
@Override
public void preStart() {
target = context().actorOf(Props.create(MyActor.class, "target"));
}
//#preStart
public PartialFunction<Object, BoxedUnit> receive() {
return emptyBehavior();
}
//#postStop
@Override
public void postStop() {
@ -191,9 +207,8 @@ public class ActorDocTest {
}
public static class ReplyException extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public ReplyException() {
receive(ReceiveBuilder.
matchAny(x -> {
//#reply-exception
try {
@ -204,7 +219,8 @@ public class ActorDocTest {
throw e;
}
//#reply-exception
}).build();
}).build()
);
}
private String operation() {
@ -223,16 +239,16 @@ public class ActorDocTest {
private ActorRef worker =
context().watch(context().actorOf(Props.create(Cruncher.class), "worker"));
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Manager() {
receive(ReceiveBuilder.
matchEquals("job", s -> {
worker.tell("crunch", self());
}).
matchEquals(SHUTDOWN, x -> {
worker.tell(PoisonPill.getInstance(), self());
context().become(shuttingDown);
}).build();
}).build()
);
}
public PartialFunction<Object, BoxedUnit> shuttingDown =
@ -263,29 +279,27 @@ public class ActorDocTest {
public static class Cruncher extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("crunch", s -> { }).build();
public Cruncher() {
receive(ReceiveBuilder.
matchEquals("crunch", s -> { }).build()
);
}
}
static
//#swapper
public class Swapper extends AbstractActor {
final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public class Swapper extends AbstractLoggingActor {
public Swapper() {
receive(ReceiveBuilder.
matchEquals(Swap, s -> {
log.info("Hi");
log().info("Hi");
context().become(ReceiveBuilder.
matchEquals(Swap, x -> {
log.info("Ho");
log().info("Ho");
context().unbecome(); // resets the latest 'become' (just for fun)
}).build(), false); // push on top instead of replace
}).build();
}).build()
);
}
}
@ -312,7 +326,7 @@ public class ActorDocTest {
public void creatingActorWithSystemActorOf() {
//#system-actorOf
// ActorSystem is a heavy object: create only one per application
final ActorSystem system = ActorSystem.create("MySystem");
final ActorSystem system = ActorSystem.create("MySystem", config);
final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
//#system-actorOf
try {
@ -357,17 +371,14 @@ public class ActorDocTest {
static
//#receive-timeout
public class ReceiveTimeoutActor extends AbstractActor {
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
}
//#receive-timeout
ActorRef target = context().system().deadLetters();
//#receive-timeout
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
matchEquals("Hello", s -> {
// To set in a response to a message
context().setReceiveTimeout(Duration.create("1 second"));
@ -382,7 +393,8 @@ public class ActorDocTest {
//#receive-timeout
target.tell("timeout", self());
//#receive-timeout
}).build();
}).build()
);
}
}
//#receive-timeout
@ -405,7 +417,7 @@ public class ActorDocTest {
private PartialFunction<Object, BoxedUnit> angry;
private PartialFunction<Object, BoxedUnit> happy;
{
public HotSwapActor() {
angry =
ReceiveBuilder.
matchEquals("foo", s -> {
@ -422,16 +434,15 @@ public class ActorDocTest {
matchEquals("foo", s -> {
context().become(angry);
}).build();
}
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
matchEquals("foo", s -> {
context().become(angry);
}).
matchEquals("bar", s -> {
context().become(happy);
}).build();
}).build()
);
}
}
//#hot-swap-actor
@ -459,8 +470,8 @@ public class ActorDocTest {
static
//#stash
public class ActorWithProtocol extends AbstractActorWithStash {
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public ActorWithProtocol() {
receive(ReceiveBuilder.
matchEquals("open", s -> {
context().become(ReceiveBuilder.
matchEquals("write", ws -> { /* do writing */ }).
@ -470,7 +481,8 @@ public class ActorDocTest {
}).
matchAny(msg -> stash()).build(), false);
}).
matchAny(msg -> stash()).build();
matchAny(msg -> stash()).build()
);
}
}
//#stash
@ -484,20 +496,20 @@ public class ActorDocTest {
//#watch
public class WatchActor extends AbstractActor {
private final ActorRef child = context().actorOf(Props.empty(), "target");
{
context().watch(child); // <-- this is the only call needed for registration
}
private ActorRef lastSender = system.deadLetters();
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public WatchActor() {
context().watch(child); // <-- this is the only call needed for registration
receive(ReceiveBuilder.
matchEquals("kill", s -> {
context().stop(child);
lastSender = sender();
}).
match(Terminated.class, t -> t.actor().equals(child), t -> {
lastSender.tell("finished", self());
}).build();
}).build()
);
}
}
//#watch
@ -519,13 +531,11 @@ public class ActorDocTest {
public class Follower extends AbstractActor {
final Integer identifyId = 1;
{
public Follower(){
ActorSelection selection = context().actorSelection("/user/another");
selection.tell(new Identify(identifyId), self());
}
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
match(ActorIdentity.class, id -> id.getRef() != null, id -> {
ActorRef ref = id.getRef();
context().watch(ref);
@ -533,7 +543,8 @@ public class ActorDocTest {
}).
match(ActorIdentity.class, id -> id.getRef() == null, id -> {
context().stop(self());
}).build();
}).build()
);
}
final PartialFunction<Object, BoxedUnit> active(final ActorRef another) {
@ -558,4 +569,73 @@ public class ActorDocTest {
}
};
}
public static class NoReceiveActor extends AbstractActor {
}
@Test
public void noReceiveActor() {
EventFilter ex1 = new ErrorFilter(ActorInitializationException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
final ActorRef victim = new EventFilter<ActorRef>(ActorInitializationException.class) {
protected ActorRef run() {
return system.actorOf(Props.create(NoReceiveActor.class), "victim");
}
}.message("Actor behavior has not been set with receive(...)").occurrences(1).exec();
assertEquals(true, victim.isTerminated());
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
public static class MultipleReceiveActor extends AbstractActor {
public MultipleReceiveActor() {
receive(ReceiveBuilder.
match(String.class, s1 -> s1.toLowerCase().equals("become"), s1 -> {
sender().tell(s1.toUpperCase(), self());
receive(ReceiveBuilder.
match(String.class, s2 -> {
sender().tell(s2.toLowerCase(), self());
}).build()
);
}).
match(String.class, s1 -> {
sender().tell(s1.toUpperCase(), self());
}).build()
);
}
}
@Test
public void multipleReceiveActor() {
EventFilter ex1 = new ErrorFilter(IllegalActorStateException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
new EventFilter<Boolean>(IllegalActorStateException.class) {
protected Boolean run() {
ActorRef victim = system.actorOf(Props.create(MultipleReceiveActor.class), "victim2");
victim.tell("Foo", getRef());
expectMsgEquals("FOO");
victim.tell("bEcoMe", getRef());
expectMsgEquals("BECOME");
victim.tell("Foo", getRef());
// if it's upper case, then the actor was restarted
expectMsgEquals("FOO");
return true;
}
}.message("Actor behavior has already been set with receive(...), " +
"use context().become(...) to change it later").occurrences(1).exec();
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
}

View file

@ -64,12 +64,12 @@ public class FaultHandlingTest {
//#strategy
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Supervisor() {
receive(ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build();
}).build()
);
}
}
@ -94,12 +94,12 @@ public class FaultHandlingTest {
//#strategy2
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Supervisor2() {
receive(ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build();
}).build()
);
}
@Override
@ -115,12 +115,12 @@ public class FaultHandlingTest {
public class Child extends AbstractActor {
int state = 0;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Child() {
receive(ReceiveBuilder.
match(Exception.class, exception -> { throw exception; }).
match(Integer.class, i -> state = i).
matchEquals("get", s -> sender().tell(state, self())).build();
matchEquals("get", s -> sender().tell(state, self())).build()
);
}
}

View file

@ -32,21 +32,21 @@ public class InitializationDocTest {
}
public static class MessageInitExample extends AbstractActor {
//#messageInit
private String initializeMe = null;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public MessageInitExample() {
//#messageInit
receive(ReceiveBuilder.
matchEquals("init", m1 -> {
initializeMe = "Up and running";
context().become(ReceiveBuilder.
matchEquals("U OK?", m2 -> {
sender().tell(initializeMe, self());
}).build());
}).build();
}).build()
//#messageInit
);
}
//#messageInit
}
@Test

View file

@ -9,8 +9,6 @@ import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
//#imports
@ -18,9 +16,8 @@ import scala.runtime.BoxedUnit;
public class MyActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public MyActor() {
receive(ReceiveBuilder.
match(String.class, s -> {
log.info("Received String message: {}", s);
//#my-actor
@ -29,7 +26,8 @@ public class MyActor extends AbstractActor {
//#reply
//#my-actor
}).
matchAny(o -> log.info("received unknown message")).build();
matchAny(o -> log.info("received unknown message")).build()
);
}
}
//#my-actor

View file

@ -18,9 +18,8 @@ public class SampleActor extends AbstractActor {
context().unbecome();
}).build();
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public SampleActor() {
receive(ReceiveBuilder.
match(Double.class, d -> {
sender().tell(d.isNaN() ? 0 : d, self());
}).
@ -30,7 +29,8 @@ public class SampleActor extends AbstractActor {
match(String.class, s -> s.startsWith("guard"), s -> {
sender().tell("startsWith(guard): " + s.toUpperCase(), self());
context().become(guarded, false);
}).build();
}).build()
);
}
}
//#sample-actor

View file

@ -73,9 +73,8 @@ public class FaultHandlingDocSample {
context().setReceiveTimeout(Duration.create("15 seconds"));
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public Listener() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Progress.class, progress -> {
log().info("Current progress: {} %", progress.percent);
if (progress.percent >= 100.0) {
@ -87,7 +86,8 @@ public class FaultHandlingDocSample {
// No progress within 15 seconds, ServiceUnavailable
log().error("Shutting down due to unavailable service");
context().system().shutdown();
}).build(), context());
}).build(), context()
));
}
}
@ -137,9 +137,8 @@ public class FaultHandlingDocSample {
return strategy;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public Worker() {
receive(LoggingReceive.create(ReceiveBuilder.
matchEquals(Start, x -> progressListener == null, x -> {
progressListener = sender();
context().system().scheduler().schedule(
@ -160,7 +159,8 @@ public class FaultHandlingDocSample {
}
}, context().dispatcher()), context().dispatcher())
.to(progressListener);
}).build(), context());
}).build(), context())
);
}
}
@ -266,9 +266,8 @@ public class FaultHandlingDocSample {
storage.tell(new Get(key), self());
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public CounterService() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Entry.class, entry -> entry.key.equals(key) && counter == null, entry -> {
// Reply from Storage of the initial value, now we can create the Counter
final long value = entry.value;
@ -301,7 +300,8 @@ public class FaultHandlingDocSample {
matchEquals(Reconnect, o -> {
// Re-establish storage after the scheduled delay
initStorage();
}).build(), context());
}).build(), context())
);
}
void forwardOrPlaceInBacklog(Object msg) {
@ -348,11 +348,8 @@ public class FaultHandlingDocSample {
public Counter(String key, long initialValue) {
this.key = key;
this.count = initialValue;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
receive(LoggingReceive.create(ReceiveBuilder.
match(UseStorage.class, useStorage -> {
storage = useStorage.storage;
storeCount();
@ -363,7 +360,8 @@ public class FaultHandlingDocSample {
}).
matchEquals(GetCurrentCount, gcc -> {
sender().tell(new CurrentCount(key, count), self());
}).build(), context());
}).build(), context())
);
}
void storeCount() {
@ -435,9 +433,8 @@ public class FaultHandlingDocSample {
final DummyDB db = DummyDB.instance;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public Storage() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Store.class, store -> {
db.save(store.entry.key, store.entry.value);
}).
@ -445,7 +442,8 @@ public class FaultHandlingDocSample {
Long value = db.load(get.key);
sender().tell(new Entry(get.key, value == null ?
Long.valueOf(0L) : value), self());
}).build(), context());
}).build(), context())
);
}
}