add preprocessor for RST docs, see #2461 and #2431

The idea is to filter the sources, replacing @<var>@ occurrences with
the mapping for <var> (which is currently hard-coded). @@ -> @. In order
to make this work, I had to move the doc sources one directory down
(into akka-docs/rst) so that the filtered result could be in a sibling
directory so that relative links (to _sphinx plugins or real code) would
continue to work.

While I was at it I also changed it so that WARNINGs and ERRORs are not
swallowed into the debug dump anymore but printed at [warn] level
(minimum).

One piece of fallout is that the (online) html build is now run after
the normal one, not in parallel.
This commit is contained in:
Roland 2012-09-21 10:47:58 +02:00
parent c0f60da8cc
commit 9bc01ae265
266 changed files with 270 additions and 182 deletions

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor
import org.scalatest.junit.JUnitSuite
class FSMDocTest extends FSMDocTestBase with JUnitSuite

View file

@ -0,0 +1,194 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#imports-data
import java.util.ArrayList;
import java.util.List;
import akka.actor.ActorRef;
//#imports-data
//#imports-actor
import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.actor.UntypedActor;
//#imports-actor
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
import akka.testkit.AkkaSpec;
public class FSMDocTestBase {
//#data
public static final class SetTarget {
final ActorRef ref;
public SetTarget(ActorRef ref) {
this.ref = ref;
}
}
public static final class Queue {
final Object o;
public Queue(Object o) {
this.o = o;
}
}
public static final Object flush = new Object();
public static final class Batch {
final List<Object> objects;
public Batch(List<Object> objects) {
this.objects = objects;
}
}
//#data
//#base
static abstract class MyFSMBase extends UntypedActor {
/*
* This is the mutable state of this state machine.
*/
protected enum State {
IDLE, ACTIVE;
}
private State state = State.IDLE;
private ActorRef target;
private List<Object> queue;
/*
* Then come all the mutator methods:
*/
protected void init(ActorRef target) {
this.target = target;
queue = new ArrayList<Object>();
}
protected void setState(State s) {
if (state != s) {
transition(state, s);
state = s;
}
}
protected void enqueue(Object o) {
if (queue != null)
queue.add(o);
}
protected List<Object> drainQueue() {
final List<Object> q = queue;
if (q == null)
throw new IllegalStateException("drainQueue(): not yet initialized");
queue = new ArrayList<Object>();
return q;
}
/*
* Here are the interrogation methods:
*/
protected boolean isInitialized() {
return target != null;
}
protected State getState() {
return state;
}
protected ActorRef getTarget() {
if (target == null)
throw new IllegalStateException("getTarget(): not yet initialized");
return target;
}
/*
* And finally the callbacks (only one in this example: react to state change)
*/
abstract protected void transition(State old, State next);
}
//#base
//#actor
static public class MyFSM extends MyFSMBase {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object o) {
if (getState() == State.IDLE) {
if (o instanceof SetTarget)
init(((SetTarget) o).ref);
else
whenUnhandled(o);
} else if (getState() == State.ACTIVE) {
if (o == flush)
setState(State.IDLE);
else
whenUnhandled(o);
}
}
@Override
public void transition(State old, State next) {
if (old == State.ACTIVE) {
getTarget().tell(new Batch(drainQueue()), getSelf());
}
}
private void whenUnhandled(Object o) {
if (o instanceof Queue && isInitialized()) {
enqueue(((Queue) o).o);
setState(State.ACTIVE);
} else {
log.warning("received unknown message {} in state {}", o, getState());
}
}
}
//#actor
ActorSystem system;
@org.junit.Before
public void setUp() {
system = ActorSystem.create("FSMSystem", AkkaSpec.testConf());
}
@org.junit.Test
public void mustBunch() {
final ActorRef buncher = system.actorOf(new Props(MyFSM.class));
final TestProbe probe = new TestProbe(system);
buncher.tell(new SetTarget(probe.ref()), null);
buncher.tell(new Queue(1), null);
buncher.tell(new Queue(2), null);
buncher.tell(flush, null);
buncher.tell(new Queue(3), null);
final Batch b = probe.expectMsgClass(Batch.class);
assert b.objects.size() == 2;
assert b.objects.contains(1);
assert b.objects.contains(2);
}
@org.junit.After
public void cleanup() {
system.shutdown();
}
}

View file

@ -0,0 +1,7 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor
import org.scalatest.junit.JUnitSuite
class FaultHandlingTest extends FaultHandlingTestBase with JUnitSuite

View file

@ -0,0 +1,214 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#testkit
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.*;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.util.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;
//#testkit
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import static java.util.concurrent.TimeUnit.SECONDS;
import akka.japi.Function;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.AfterClass;
//#testkit
public class FaultHandlingTestBase {
//#testkit
//#supervisor
static public class Supervisor extends UntypedActor {
//#strategy
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
//#strategy
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
}
//#supervisor
//#supervisor2
static public class Supervisor2 extends UntypedActor {
//#strategy2
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
//#strategy2
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
@Override
public void preRestart(Throwable cause, Option<Object> msg) {
// do not kill all children, which is the default here
}
}
//#supervisor2
//#child
static public class Child extends UntypedActor {
int state = 0;
public void onReceive(Object o) throws Exception {
if (o instanceof Exception) {
throw (Exception) o;
} else if (o instanceof Integer) {
state = (Integer) o;
} else if (o.equals("get")) {
getSender().tell(state, getSelf());
} else {
unhandled(o);
}
}
}
//#child
//#testkit
static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS);
@BeforeClass
public static void start() {
system = ActorSystem.create("test", AkkaSpec.testConf());
}
@AfterClass
public static void cleanup() {
system.shutdown();
}
@Test
public void mustEmploySupervisorStrategy() throws Exception {
// code here
//#testkit
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);
EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class);
EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class);
Seq<EventFilter> ignoreExceptions = seq(ex1, ex2, ex3, ex4);
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
//#create
Props superprops = new Props(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
//#create
//#resume
child.tell(42, null);
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), null);
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
//#resume
//#restart
child.tell(new NullPointerException(), null);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#restart
//#stop
final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), null);
probe.expectMsgClass(Terminated.class);
//#stop
//#escalate-kill
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), null);
probe.expectMsgClass(Terminated.class);
//#escalate-kill
//#escalate-restart
superprops = new Props(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
child.tell(23, null);
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), null);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#escalate-restart
//#testkit
}
//#testkit
public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq();
}
//#testkit
}
//#testkit

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;
//#context-actorOf
public class FirstUntypedActor extends UntypedActor {
ActorRef myActor = getContext().actorOf(new Props(MyActor.class), "myactor");
//#context-actorOf
public void onReceive(Object message) {
myActor.forward(message, getContext());
myActor.tell(PoisonPill.getInstance(), null);
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
//#immutable-message
public class ImmutableMessage {
private final int sequenceNumber;
private final List<String> values;
public ImmutableMessage(int sequenceNumber, List<String> values) {
this.sequenceNumber = sequenceNumber;
this.values = Collections.unmodifiableList(new ArrayList<String>(values));
}
public int getSequenceNumber() {
return sequenceNumber;
}
public List<String> getValues() {
return values;
}
}
//#immutable-message

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#receive-timeout
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import scala.concurrent.util.Duration;
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world", getSelf());
} else if (message == ReceiveTimeout.getInstance()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}
//#receive-timeout

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#my-untyped-actor
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof String)
log.info("Received String message: {}", message);
else
unhandled(message);
}
}
//#my-untyped-actor

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor
import org.scalatest.junit.JUnitSuite
class SchedulerDocTest extends SchedulerDocTestBase with JUnitSuite

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#imports1
import akka.actor.Props;
import scala.concurrent.util.Duration;
import java.util.concurrent.TimeUnit;
//#imports1
//#imports2
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.actor.Cancellable;
//#imports2
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.AkkaSpec;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class SchedulerDocTestBase {
ActorSystem system;
ActorRef testActor;
@Before
public void setUp() {
system = ActorSystem.create("MySystem", AkkaSpec.testConf());
testActor = system.actorOf(new Props(MyUntypedActor.class));
}
@After
public void tearDown() {
system.shutdown();
}
@Test
public void scheduleOneOffTask() {
//#schedule-one-off-message
//Schedules to send the "foo"-message to the testActor after 50ms
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), testActor, "foo", system.dispatcher());
//#schedule-one-off-message
//#schedule-one-off-thunk
//Schedules a Runnable to be executed (send the current time) to the testActor after 50ms
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
@Override
public void run() {
testActor.tell(System.currentTimeMillis(), null);
}
}, system.dispatcher());
//#schedule-one-off-thunk
}
@Test
public void scheduleRecurringTask() {
//#schedule-recurring
ActorRef tickActor = system.actorOf(new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedActor() {
public void onReceive(Object message) {
if (message.equals("Tick")) {
// Do someting
} else {
unhandled(message);
}
}
};
}
}));
//This will schedule to send the Tick-message
//to the tickActor after 0ms repeating every 50ms
Cancellable cancellable = system.scheduler().schedule(Duration.Zero(), Duration.create(50, TimeUnit.MILLISECONDS),
tickActor, "Tick", system.dispatcher());
//This cancels further Ticks to be sent
cancellable.cancel();
//#schedule-recurring
system.stop(tickActor);
}
}

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor
import org.scalatest.junit.JUnitSuite
class TypedActorDocTest extends TypedActorDocTestBase with JUnitSuite

View file

@ -0,0 +1,187 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#imports
import akka.actor.TypedActor;
import akka.actor.*;
import akka.japi.*;
import akka.dispatch.Futures;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import java.util.concurrent.TimeUnit;
//#imports
import java.lang.Exception;
import org.junit.Test;
import static org.junit.Assert.*;
public class TypedActorDocTestBase {
Object someReference = null;
ActorSystem system = null;
//#typed-actor-iface
public static interface Squarer {
//#typed-actor-iface-methods
void squareDontCare(int i); //fire-forget
Future<Integer> square(int i); //non-blocking send-request-reply
Option<Integer> squareNowPlease(int i);//blocking send-request-reply
int squareNow(int i); //blocking send-request-reply
//#typed-actor-iface-methods
}
//#typed-actor-iface
//#typed-actor-impl
static class SquarerImpl implements Squarer {
private String name;
public SquarerImpl() {
this.name = "default";
}
public SquarerImpl(String name) {
this.name = name;
}
//#typed-actor-impl-methods
public void squareDontCare(int i) {
int sq = i * i; //Nobody cares :(
}
public Future<Integer> square(int i) {
return Futures.successful(i * i);
}
public Option<Integer> squareNowPlease(int i) {
return Option.some(i * i);
}
public int squareNow(int i) {
return i * i;
}
//#typed-actor-impl-methods
}
//#typed-actor-impl
@Test public void mustGetTheTypedActorExtension() {
try {
//#typed-actor-extension-tools
//Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem
//Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);
//Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);
//Returns the current ActorContext,
// method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();
//Returns the external proxy of the current Typed Actor,
// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.<Squarer>self();
//Returns a contextual instance of the Typed Actor Extension
//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());
//#typed-actor-extension-tools
} catch (Exception e) {
//dun care
}
}
@Test public void createATypedActor() {
try {
//#typed-actor-create1
Squarer mySquarer =
TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
//#typed-actor-create1
//#typed-actor-create2
Squarer otherSquarer =
TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,
new Creator<SquarerImpl>() {
public SquarerImpl create() { return new SquarerImpl("foo"); }
}),
"name");
//#typed-actor-create2
//#typed-actor-calls
//#typed-actor-call-oneway
mySquarer.squareDontCare(10);
//#typed-actor-call-oneway
//#typed-actor-call-future
Future<Integer> fSquare = mySquarer.square(10); //A Future[Int]
//#typed-actor-call-future
//#typed-actor-call-option
Option<Integer> oSquare = mySquarer.squareNowPlease(10); //Option[Int]
//#typed-actor-call-option
//#typed-actor-call-strict
int iSquare = mySquarer.squareNow(10); //Int
//#typed-actor-call-strict
//#typed-actor-calls
assertEquals(100, Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue());
assertEquals(100, oSquare.get().intValue());
assertEquals(100, iSquare);
//#typed-actor-stop
TypedActor.get(system).stop(mySquarer);
//#typed-actor-stop
//#typed-actor-poisonpill
TypedActor.get(system).poisonPill(otherSquarer);
//#typed-actor-poisonpill
} catch(Exception e) {
//Ignore
}
}
@Test public void createHierarchies() {
try {
//#typed-actor-hierarchy
Squarer childSquarer =
TypedActor.get(TypedActor.context()).
typedActorOf(
new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
);
//Use "childSquarer" as a Squarer
//#typed-actor-hierarchy
} catch (Exception e) {
//dun care
}
}
@Test public void proxyAnyActorRef() {
try {
//#typed-actor-remote
Squarer typedActor =
TypedActor.get(system).
typedActorOf(
new TypedProps<Squarer>(Squarer.class),
system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar")
);
//Use "typedActor" as a FooBar
//#typed-actor-remote
} catch (Exception e) {
//dun care
}
}
}

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor
import org.scalatest.junit.JUnitSuite
class UntypedActorDocTest extends UntypedActorDocTestBase with JUnitSuite

View file

@ -0,0 +1,396 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#imports
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
//#imports
//#import-future
import scala.concurrent.Future;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
//#import-future
//#import-actors
import akka.actor.PoisonPill;
import akka.actor.Kill;
//#import-actors
//#import-procedure
import akka.japi.Procedure;
//#import-procedure
//#import-watch
import akka.actor.Terminated;
//#import-watch
//#import-gracefulStop
import static akka.pattern.Patterns.gracefulStop;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import akka.pattern.AskTimeoutException;
//#import-gracefulStop
//#import-askPipe
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import scala.concurrent.Future;
import akka.dispatch.Futures;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
//#import-askPipe
//#import-stash
import akka.actor.UntypedActorWithStash;
//#import-stash
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import org.junit.Test;
import scala.Option;
import java.lang.Object;
import java.util.Iterator;
import akka.pattern.Patterns;
public class UntypedActorDocTestBase {
@Test
public void createProps() {
//#creating-props-config
Props props1 = new Props();
Props props2 = new Props(MyUntypedActor.class);
Props props3 = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
Props props4 = props1.withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
//#creating-props-config
}
@Test
public void systemActorOf() {
//#system-actorOf
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");
//#system-actorOf
myActor.tell("test", null);
system.shutdown();
}
@Test
public void contextActorOf() {
//#context-actorOf
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");
//#context-actorOf
myActor.tell("test", null);
system.shutdown();
}
@Test
public void constructorActorOf() {
ActorSystem system = ActorSystem.create("MySystem");
//#creating-constructor
// allows passing in arguments to the MyActor constructor
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyActor("...");
}
}), "myactor");
//#creating-constructor
myActor.tell("test", null);
system.shutdown();
}
@Test
public void propsActorOf() {
ActorSystem system = ActorSystem.create("MySystem");
//#creating-props
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");
//#creating-props
myActor.tell("test", null);
system.shutdown();
}
@Test
public void usingAsk() throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyAskActor();
}
}), "myactor");
//#using-ask
Future<Object> future = Patterns.ask(myActor, "Hello", 1000);
Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
//#using-ask
system.shutdown();
}
@Test
public void receiveTimeout() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class));
myActor.tell("Hello", null);
system.shutdown();
}
@Test
public void usePoisonPill() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class));
//#poison-pill
myActor.tell(PoisonPill.getInstance(), null);
//#poison-pill
system.shutdown();
}
@Test
public void useKill() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef victim = system.actorOf(new Props(MyUntypedActor.class));
//#kill
victim.tell(Kill.getInstance(), null);
//#kill
system.shutdown();
}
@Test
public void useBecome() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new HotSwapActor();
}
}));
myActor.tell("foo", null);
myActor.tell("bar", null);
myActor.tell("bar", null);
system.shutdown();
}
@Test
public void useWatch() throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(WatchActor.class));
Future<Object> future = Patterns.ask(myActor, "kill", 1000);
assert Await.result(future, Duration.parse("1 second")).equals("finished");
system.shutdown();
}
@Test
public void usePatternsGracefulStop() throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef actorRef = system.actorOf(new Props(MyUntypedActor.class));
//#gracefulStop
try {
Future<Boolean> stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (AskTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
//#gracefulStop
system.shutdown();
}
class Result {
final int x;
final String s;
public Result(int x, String s) {
this.x = x;
this.s = s;
}
}
@Test
public void usePatternsAskPipe() {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class));
ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class));
//#ask-pipe
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout
futures.add(ask(actorB, "another request", t)); // using timeout from above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher());
final Future<Result> transformed = aggregate.map(new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String s = (String) it.next();
final int x = (Integer) it.next();
return new Result(x, s);
}
}, system.dispatcher());
pipe(transformed, system.dispatcher()).to(actorC);
//#ask-pipe
system.shutdown();
}
public static class MyActor extends UntypedActor {
public MyActor(String s) {
}
public void onReceive(Object message) throws Exception {
try {
operation();
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
}
private void operation() {
}
//#lifecycle-callbacks
public void preStart() {
}
public void preRestart(Throwable reason, Option<Object> message) {
for (ActorRef each : getContext().getChildren())
getContext().stop(each);
postStop();
}
public void postRestart(Throwable reason) {
preStart();
}
public void postStop() {
}
//#lifecycle-callbacks
}
public static class MyAskActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
//#reply-exception
try {
String result = operation();
getSender().tell(result, getSelf());
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
//#reply-exception
}
private String operation() {
return "Hi";
}
}
//#hot-swap-actor
public static class HotSwapActor extends UntypedActor {
Procedure<Object> angry = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already angry?", getSelf());
} else if (message.equals("foo")) {
getContext().become(happy);
}
}
};
Procedure<Object> happy = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already happy :-)", getSelf());
} else if (message.equals("foo")) {
getContext().become(angry);
}
}
};
public void onReceive(Object message) {
if (message.equals("bar")) {
getContext().become(angry);
} else if (message.equals("foo")) {
getContext().become(happy);
} else {
unhandled(message);
}
}
}
//#hot-swap-actor
//#stash
public static class ActorWithProtocol extends UntypedActorWithStash {
private Boolean isOpen = false;
public void onReceive(Object msg) {
if (isOpen) {
if (msg.equals("write")) {
// do writing...
} else if (msg.equals("close")) {
unstashAll();
isOpen = false;
} else {
stash();
}
} else {
if (msg.equals("open")) {
unstashAll();
isOpen = true;
} else {
stash();
}
}
}
}
//#stash
//#watch
public static class WatchActor extends UntypedActor {
final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child); // <-- this is the only call needed for registration
}
ActorRef lastSender = getContext().system().deadLetters();
@Override
public void onReceive(Object message) {
if (message.equals("kill")) {
getContext().stop(child);
lastSender = getSender();
} else if (message instanceof Terminated) {
final Terminated t = (Terminated) message;
if (t.getActor() == child) {
lastSender.tell("finished", getSelf());
}
} else {
unhandled(message);
}
}
}
//#watch
}

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import static docs.actor.UntypedActorSwapper.Swap.SWAP;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Procedure;
//#swapper
public class UntypedActorSwapper {
public static class Swap {
public static Swap SWAP = new Swap();
private Swap() {
}
}
public static class Swapper extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) {
if (message == SWAP) {
log.info("Hi");
getContext().become(new Procedure<Object>() {
@Override
public void apply(Object message) {
log.info("Ho");
getContext().unbecome(); // resets the latest 'become' (just for fun)
}
});
} else {
unhandled(message);
}
}
}
public static void main(String... args) {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef swap = system.actorOf(new Props(Swapper.class));
swap.tell(SWAP, null); // logs Hi
swap.tell(SWAP, null); // logs Ho
swap.tell(SWAP, null); // logs Hi
swap.tell(SWAP, null); // logs Ho
swap.tell(SWAP, null); // logs Hi
swap.tell(SWAP, null); // logs Ho
}
}
//#swapper

View file

@ -0,0 +1,479 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.japi;
//#all
//#imports
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.Function;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import static akka.japi.Util.classTag;
import static akka.actor.SupervisorStrategy.*;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import static docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
import static docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
import static docs.actor.japi.FaultHandlingDocSample.CounterApi.*;
import static docs.actor.japi.FaultHandlingDocSample.StorageApi.*;
//#imports
public class FaultHandlingDocSample {
/**
* Runs the sample
*/
public static void main(String[] args) {
Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on");
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
// start the work and listen on progress
// note that the listener is used as sender of the tell,
// i.e. it will receive replies from the worker
worker.tell(Start, listener);
}
/**
* Listens on progress from the worker and shuts down the system when enough
* work has been done.
*/
public static class Listener extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void preStart() {
// If we don't get any progress within 15 seconds then the service is unavailable
getContext().setReceiveTimeout(Duration.parse("15 seconds"));
}
public void onReceive(Object msg) {
log.debug("received message {}", msg);
if (msg instanceof Progress) {
Progress progress = (Progress) msg;
log.info("Current progress: {} %", progress.percent);
if (progress.percent >= 100.0) {
log.info("That's all, shutting down");
getContext().system().shutdown();
}
} else if (msg == ReceiveTimeout.getInstance()) {
// No progress within 15 seconds, ServiceUnavailable
log.error("Shutting down due to unavailable service");
getContext().system().shutdown();
} else {
unhandled(msg);
}
}
}
//#messages
public interface WorkerApi {
public static final Object Start = "Start";
public static final Object Do = "Do";
public static class Progress {
public final double percent;
public Progress(double percent) {
this.percent = percent;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), percent);
}
}
}
//#messages
/**
* Worker performs some work when it receives the Start message. It will
* continuously notify the sender of the Start message of current Progress.
* The Worker supervise the CounterService.
*/
public static class Worker extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final Timeout askTimeout = new Timeout(Duration.create(5, "seconds"));
// The sender of the initial Start message will continuously be notified about progress
ActorRef progressListener;
final ActorRef counterService = getContext().actorOf(new Props(CounterService.class), "counter");
final int totalCount = 51;
// Stop the CounterService child if it throws ServiceUnavailable
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ServiceUnavailable) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object msg) {
log.debug("received message {}", msg);
if (msg.equals(Start) && progressListener == null) {
progressListener = getSender();
getContext().system().scheduler().schedule(
Duration.Zero(), Duration.create(1, "second"), getSelf(), Do, getContext().dispatcher()
);
} else if (msg.equals(Do)) {
counterService.tell(new Increment(1), getSelf());
counterService.tell(new Increment(1), getSelf());
counterService.tell(new Increment(1), getSelf());
// Send current progress to the initial sender
pipe(ask(counterService, GetCurrentCount, askTimeout)
.mapTo(classTag(CurrentCount.class))
.map(new Mapper<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}, getContext().dispatcher()), getContext().dispatcher())
.to(progressListener);
} else {
unhandled(msg);
}
}
}
//#messages
public interface CounterServiceApi {
public static final Object GetCurrentCount = "GetCurrentCount";
public static class CurrentCount {
public final String key;
public final long count;
public CurrentCount(String key, long count) {
this.key = key;
this.count = count;
}
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
}
}
public static class Increment {
public final long n;
public Increment(long n) {
this.n = n;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), n);
}
}
public static class ServiceUnavailable extends RuntimeException {
public ServiceUnavailable(String msg) {
super(msg);
}
}
}
//#messages
/**
* Adds the value received in Increment message to a persistent counter.
* Replies with CurrentCount when it is asked for CurrentCount. CounterService
* supervise Storage and Counter.
*/
public static class CounterService extends UntypedActor {
// Reconnect message
static final Object Reconnect = "Reconnect";
private static class SenderMsgPair {
final ActorRef sender;
final Object msg;
SenderMsgPair(ActorRef sender, Object msg) {
this.msg = msg;
this.sender = sender;
}
}
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final String key = getSelf().path().name();
ActorRef storage;
ActorRef counter;
final List<SenderMsgPair> backlog = new ArrayList<SenderMsgPair>();
final int MAX_BACKLOG = 10000;
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof StorageException) {
return restart();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void preStart() {
initStorage();
}
/**
* The child storage is restarted in case of failure, but after 3 restarts,
* and still failing it will be stopped. Better to back-off than
* continuously failing. When it has been stopped we will schedule a
* Reconnect after a delay. Watch the child so we receive Terminated message
* when it has been terminated.
*/
void initStorage() {
storage = getContext().watch(getContext().actorOf(new Props(Storage.class), "storage"));
// Tell the counter, if any, to use the new storage
if (counter != null)
counter.tell(new UseStorage(storage), getSelf());
// We need the initial value to be able to operate
storage.tell(new Get(key), getSelf());
}
@Override
public void onReceive(Object msg) {
log.debug("received message {}", msg);
if (msg instanceof Entry && ((Entry) msg).key.equals(key) && counter == null) {
// Reply from Storage of the initial value, now we can create the Counter
final long value = ((Entry) msg).value;
counter = getContext().actorOf(new Props().withCreator(new UntypedActorFactory() {
public Actor create() {
return new Counter(key, value);
}
}));
// Tell the counter to use current storage
counter.tell(new UseStorage(storage), getSelf());
// and send the buffered backlog to the counter
for (SenderMsgPair each : backlog) {
counter.tell(each.msg, each.sender);
}
backlog.clear();
} else if (msg instanceof Increment) {
forwardOrPlaceInBacklog(msg);
} else if (msg.equals(GetCurrentCount)) {
forwardOrPlaceInBacklog(msg);
} else if (msg instanceof Terminated) {
// After 3 restarts the storage child is stopped.
// We receive Terminated because we watch the child, see initStorage.
storage = null;
// Tell the counter that there is no storage for the moment
counter.tell(new UseStorage(null), getSelf());
// Try to re-establish storage after while
getContext().system().scheduler().scheduleOnce(
Duration.create(10, "seconds"), getSelf(), Reconnect, getContext().dispatcher()
);
} else if (msg.equals(Reconnect)) {
// Re-establish storage after the scheduled delay
initStorage();
} else {
unhandled(msg);
}
}
void forwardOrPlaceInBacklog(Object msg) {
// We need the initial value from storage before we can start delegate to the counter.
// Before that we place the messages in a backlog, to be sent to the counter when
// it is initialized.
if (counter == null) {
if (backlog.size() >= MAX_BACKLOG)
throw new ServiceUnavailable("CounterService not available, lack of initial value");
backlog.add(new SenderMsgPair(getSender(), msg));
} else {
counter.forward(msg, getContext());
}
}
}
//#messages
public interface CounterApi {
public static class UseStorage {
public final ActorRef storage;
public UseStorage(ActorRef storage) {
this.storage = storage;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), storage);
}
}
}
//#messages
/**
* The in memory count variable that will send current value to the Storage,
* if there is any storage available at the moment.
*/
public static class Counter extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final String key;
long count;
ActorRef storage;
public Counter(String key, long initialValue) {
this.key = key;
this.count = initialValue;
}
@Override
public void onReceive(Object msg) {
log.debug("received message {}", msg);
if (msg instanceof UseStorage) {
storage = ((UseStorage) msg).storage;
storeCount();
} else if (msg instanceof Increment) {
count += ((Increment) msg).n;
storeCount();
} else if (msg.equals(GetCurrentCount)) {
getSender().tell(new CurrentCount(key, count), getSelf());
} else {
unhandled(msg);
}
}
void storeCount() {
// Delegate dangerous work, to protect our valuable state.
// We can continue without storage.
if (storage != null) {
storage.tell(new Store(new Entry(key, count)), getSelf());
}
}
}
//#messages
public interface StorageApi {
public static class Store {
public final Entry entry;
public Store(Entry entry) {
this.entry = entry;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), entry);
}
}
public static class Entry {
public final String key;
public final long value;
public Entry(String key, long value) {
this.key = key;
this.value = value;
}
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, value);
}
}
public static class Get {
public final String key;
public Get(String key) {
this.key = key;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), key);
}
}
public static class StorageException extends RuntimeException {
public StorageException(String msg) {
super(msg);
}
}
}
//#messages
/**
* Saves key/value pairs to persistent storage when receiving Store message.
* Replies with current value when receiving Get message. Will throw
* StorageException if the underlying data store is out of order.
*/
public static class Storage extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final DummyDB db = DummyDB.instance;
@Override
public void onReceive(Object msg) {
log.debug("received message {}", msg);
if (msg instanceof Store) {
Store store = (Store) msg;
db.save(store.entry.key, store.entry.value);
} else if (msg instanceof Get) {
Get get = (Get) msg;
Long value = db.load(get.key);
getSender().tell(new Entry(get.key, value == null ? Long.valueOf(0L) : value), getSelf());
} else {
unhandled(msg);
}
}
}
//#dummydb
public static class DummyDB {
public static final DummyDB instance = new DummyDB();
private final Map<String, Long> db = new HashMap<String, Long>();
private DummyDB() {
}
public synchronized void save(String key, Long value) throws StorageException {
if (11 <= value && value <= 14)
throw new StorageException("Simulated store failure " + value);
db.put(key, value);
}
public synchronized Long load(String key) throws StorageException {
return db.get(key);
}
}
//#dummydb
}
//#all