Updated documentation of Actors (Java). See #1435

* Aligned the Java and Scala documentation for Actors
* Implemented hotswap samples in Java, and documented in same way as Scala docs
* Improved Actors (Scala) docs
* Fixed wrong preRestart and postRestart in UntypedActor
* Changed name of Dispatchers.fromConfig to newFromConfig and made it Java friendly
* Added ActorRef.ask with Timeout parameter in addition to the timeoutMillis
This commit is contained in:
Patrik Nordwall 2011-12-08 14:06:20 +01:00
parent b4f486667f
commit ce128740ab
17 changed files with 848 additions and 327 deletions

View file

@ -0,0 +1,17 @@
package akka.docs.actor;
import akka.actor.ActorRef;
import static akka.actor.Actors.*;
import akka.actor.UntypedActor;
//#context-actorOf
public class FirstUntypedActor extends UntypedActor {
ActorRef myActor = getContext().actorOf(MyActor.class);
//#context-actorOf
public void onReceive(Object message) {
myActor.forward(message, getContext());
myActor.tell(poisonPill());
}
}

View file

@ -0,0 +1,25 @@
package akka.docs.actor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
//#immutable-message
public class ImmutableMessage {
private final int sequenceNumber;
private final List<String> values;
public ImmutableMessage(int sequenceNumber, List<String> values) {
this.sequenceNumber = sequenceNumber;
this.values = Collections.unmodifiableList(new ArrayList<String>(values));
}
public int getSequenceNumber() {
return sequenceNumber;
}
public List<String> getValues() {
return values;
}
}
//#immutable-message

View file

@ -0,0 +1,26 @@
package akka.docs.actor;
//#receive-timeout
import akka.actor.Actors;
import akka.actor.ReceiveTimeout;
import akka.actor.UnhandledMessageException;
import akka.actor.UntypedActor;
import akka.util.Duration;
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) throws Exception {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
throw new UnhandledMessageException(message, getSelf());
}
}
}
//#receive-timeout

View file

@ -0,0 +1,20 @@
package akka.docs.actor;
//#my-untyped-actor
import akka.actor.UntypedActor;
import akka.actor.UnhandledMessageException;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof String)
log.info("Received String message: {}", message);
else
throw new UnhandledMessageException(message, getSelf());
}
}
//#my-untyped-actor

View file

@ -0,0 +1,53 @@
package akka.docs.actor;
import static akka.docs.actor.UntypedActorSwapper.Swap.SWAP;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UnhandledMessageException;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Procedure;
//#swapper
public class UntypedActorSwapper {
public static class Swap {
public static Swap SWAP = new Swap();
private Swap() {
}
}
public static class Swapper extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message == SWAP) {
log.info("Hi");
getContext().become(new Procedure<Object>() {
@Override
public void apply(Object message) {
log.info("Ho");
getContext().unbecome(); // resets the latest 'become' (just for fun)
}
});
} else {
throw new UnhandledMessageException(message, getSelf());
}
}
}
public static void main(String... args) {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef swap = system.actorOf(Swapper.class);
swap.tell(SWAP); // logs Hi
swap.tell(SWAP); // logs Ho
swap.tell(SWAP); // logs Hi
swap.tell(SWAP); // logs Ho
swap.tell(SWAP); // logs Hi
swap.tell(SWAP); // logs Ho
}
}
//#swapper

View file

@ -0,0 +1,5 @@
package akka.docs.actor
import org.scalatest.junit.JUnitSuite
class UntypedActorTest extends UntypedActorTestBase with JUnitSuite

View file

@ -0,0 +1,239 @@
package akka.docs.actor;
//#imports
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
//#imports
//#import-future
import akka.dispatch.Future;
//#import-future
//#import-actors
import static akka.actor.Actors.*;
//#import-actors
//#import-procedure
import akka.japi.Procedure;
//#import-procedure
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.MessageDispatcher;
import org.junit.Test;
import scala.Option;
import static org.junit.Assert.*;
public class UntypedActorTestBase {
@Test
public void systemActorOf() {
//#system-actorOf
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(MyUntypedActor.class);
//#system-actorOf
myActor.tell("test");
system.stop();
}
@Test
public void contextActorOf() {
//#context-actorOf
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(MyUntypedActor.class);
//#context-actorOf
myActor.tell("test");
system.stop();
}
@Test
public void constructorActorOf() {
ActorSystem system = ActorSystem.create("MySystem");
//#creating-constructor
// allows passing in arguments to the MyActor constructor
ActorRef myActor = system.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new MyActor("...");
}
});
//#creating-constructor
myActor.tell("test");
system.stop();
}
@Test
public void propsActorOf() {
ActorSystem system = ActorSystem.create("MySystem");
//#creating-props
MessageDispatcher dispatcher = system.dispatcherFactory().newFromConfig("my-dispatcher");
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher),
"myactor");
//#creating-props
myActor.tell("test");
system.stop();
}
@Test
public void usingAsk() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new MyAskActor();
}
});
//#using-ask
Future future = myActor.ask("Hello", 1000);
future.await();
if (future.isCompleted()) {
Option resultOption = future.result();
if (resultOption.isDefined()) {
Object result = resultOption.get();
// ...
} else {
//... whatever
}
}
//#using-ask
system.stop();
}
@Test
public void receiveTimeout() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(MyReceivedTimeoutUntypedActor.class);
myActor.tell("Hello");
system.stop();
}
@Test
public void usePoisonPill() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(MyUntypedActor.class);
//#poison-pill
myActor.tell(poisonPill());
//#poison-pill
system.stop();
}
@Test
public void useKill() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef victim = system.actorOf(MyUntypedActor.class);
//#kill
victim.tell(kill());
//#kill
system.stop();
}
@Test
public void useBecome() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new HotSwapActor();
}
});
myActor.tell("foo");
myActor.tell("bar");
myActor.tell("bar");
system.stop();
}
public static class MyActor extends UntypedActor {
public MyActor(String s) {
}
public void onReceive(Object message) throws Exception {
try {
operation();
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e));
throw e;
}
}
private void operation() {
}
//#lifecycle-callbacks
public void preStart() {
}
public void preRestart(Throwable reason, Option<Object> message) {
postStop();
}
public void postRestart(Throwable reason) {
preStart();
}
public void postStop() {
}
//#lifecycle-callbacks
}
public static class MyAskActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
//#reply-exception
try {
String result = operation();
getSender().tell(result);
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e));
throw e;
}
//#reply-exception
}
private String operation() {
return "Hi";
}
}
//#hot-swap-actor
public static class HotSwapActor extends UntypedActor {
Procedure<Object> angry = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("foo")) {
getSender().tell("I am already angry?");
} else if (message.equals("foo")) {
getContext().become(happy);
}
}
};
Procedure<Object> happy = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already happy :-)");
} else if (message.equals("foo")) {
getContext().become(angry);
}
}
};
public void onReceive(Object message) {
if (message.equals("bar")) {
getContext().become(angry);
} else if (message.equals("foo")) {
getContext().become(happy);
}
}
}
//#hot-swap-actor
}