Merge branch 'master' into wip-2605-java-pattern-ricklatrine

Conflicts:
	akka-docs/rst/java/howto.rst
This commit is contained in:
RickLatrine 2012-12-08 11:49:06 +01:00
commit c17b1eb263
464 changed files with 7889 additions and 3723 deletions

View file

@ -132,7 +132,7 @@ An ``ActivationTimeoutException`` is thrown if the endpoint could not be activat
Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the `SendProcessor`_ is stopped.
A ``DeActivationTimeoutException`` is thrown if the associated camel objects could not be deactivated within the specified timeout.
.. _Camel: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Camel.scala
.. _Camel: @github@/akka-camel/src/main/scala/akka/camel/Camel.scala
.. _CamelContext: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/CamelContext.java
.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
.. _SendProcessor: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@ -146,7 +146,7 @@ class. For example, the following actor class (Consumer1) implements the
`getEndpointUri` method, which is declared in the `UntypedConsumerActor`_ class, in order to receive
messages from the ``file:data/input/actor`` Camel endpoint.
.. _UntypedConsumerActor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
.. _UntypedConsumerActor: @github@/akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
.. includecode:: code/docs/camel/Consumer1.java#Consumer1
@ -156,7 +156,7 @@ actor. Messages consumed by actors from Camel endpoints are of type
`CamelMessage`_. These are immutable representations of Camel messages.
.. _file component: http://camel.apache.org/file2.html
.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
.. _Message: @github@/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
Here's another example that sets the endpointUri to
@ -176,7 +176,7 @@ client the response type should be `CamelMessage`_. For any other response type,
new CamelMessage object is created by akka-camel with the actor response as message
body.
.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
.. _Message: @github@/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
.. _camel-acknowledgements-java:
@ -221,7 +221,7 @@ The timeout on the consumer actor can be overridden with the ``replyTimeout``, a
.. includecode:: code/docs/camel/Consumer4.java#Consumer4
.. _Exchange: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/Exchange.java
.. _ask: http://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/pattern/Patterns.scala
.. _ask: @github@/akka-actor/src/main/scala/akka/pattern/Patterns.scala
Producer Actors
===============
@ -296,7 +296,7 @@ For initiating a a two-way message exchange, one of the
.. includecode:: code/docs/camel/RequestBodyActor.java#RequestProducerTemplate
.. _UntypedProducerActor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
.. _UntypedProducerActor: @github@/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
.. _ProducerTemplate: https://svn.apache.org/repos/asf/camel/tags/camel-2.8.0/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
.. _camel-asynchronous-routing-java:
@ -361,7 +361,7 @@ Akka Camel components
Akka actors can be accessed from Camel routes using the `actor`_ Camel component. This component can be used to
access any Akka actor (not only consumer actors) from Camel routes, as described in the following sections.
.. _actor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
.. _actor: @github@/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
.. _access-to-actors-java:
@ -372,7 +372,7 @@ To access actors from custom Camel routes, the `actor`_ Camel
component should be used. It fully supports Camel's `asynchronous routing
engine`_.
.. _actor: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
.. _actor: @github@/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
.. _asynchronous routing engine: http://camel.apache.org/asynchronous-routing-engine.html
This component accepts the following endpoint URI format:
@ -388,6 +388,8 @@ URI options
The following URI options are supported:
.. tabularcolumns:: |l|l|l|L|
+--------------+----------+---------+------------------------------------------------+
| Name | Type | Default | Description |
+==============+==========+=========+================================================+

View file

@ -7,14 +7,18 @@ package docs.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.*;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;
@ -23,10 +27,11 @@ import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq;
import akka.japi.Function;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Seq;
import org.junit.Test;
import org.junit.BeforeClass;
@ -41,7 +46,7 @@ public class FaultHandlingTestBase {
//#strategy
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.parse("1 minute"),
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
@ -81,7 +86,7 @@ public class FaultHandlingTestBase {
//#strategy2
private static SupervisorStrategy strategy = new OneForOneStrategy(10,
Duration.parse("1 minute"),
Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
@ -215,8 +220,7 @@ public class FaultHandlingTestBase {
//#testkit
public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter(
java.util.Arrays.asList(args)).asScala().toSeq();
return immutableSeq(args);
}
//#testkit
}

View file

@ -6,18 +6,23 @@ package docs.actor;
//#receive-timeout
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
// To set an initial delay
getContext().setReceiveTimeout(Duration.create("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
// To set in a response to a message
getContext().setReceiveTimeout(Duration.create("10 seconds"));
getSender().tell("Hello world", getSelf());
} else if (message == ReceiveTimeout.getInstance()) {
// To turn it off
getContext().setReceiveTimeout(Duration.Undefined());
throw new RuntimeException("received timeout");
} else {
unhandled(message);

View file

@ -5,7 +5,7 @@ package docs.actor;
//#imports1
import akka.actor.Props;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
//#imports1

View file

@ -11,7 +11,7 @@ import akka.japi.*;
import akka.dispatch.Futures;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
//#imports

View file

@ -14,7 +14,7 @@ import scala.concurrent.Future;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
//#import-future
@ -35,7 +35,7 @@ import akka.actor.Terminated;
import static akka.pattern.Patterns.gracefulStop;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.pattern.AskTimeoutException;
//#import-gracefulStop
@ -44,7 +44,7 @@ import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import scala.concurrent.Future;
import akka.dispatch.Futures;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
@ -192,7 +192,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(WatchActor.class));
Future<Object> future = Patterns.ask(myActor, "kill", 1000);
assert Await.result(future, Duration.parse("1 second")).equals("finished");
assert Await.result(future, Duration.create("1 second")).equals("finished");
system.shutdown();
}
@ -351,24 +351,23 @@ public class UntypedActorDocTestBase {
static
//#stash
public class ActorWithProtocol extends UntypedActorWithStash {
private Boolean isOpen = false;
public void onReceive(Object msg) {
if (isOpen) {
if (msg.equals("write")) {
// do writing...
} else if (msg.equals("close")) {
unstashAll();
isOpen = false;
} else {
stash();
}
if (msg.equals("open")) {
unstashAll();
getContext().become(new Procedure<Object>() {
public void apply(Object msg) throws Exception {
if (msg.equals("write")) {
// do writing...
} else if (msg.equals("close")) {
unstashAll();
getContext().unbecome();
} else {
stash();
}
}
}, false); // add behavior on top instead of replacing
} else {
if (msg.equals("open")) {
unstashAll();
isOpen = true;
} else {
stash();
}
stash();
}
}
}

View file

@ -32,9 +32,9 @@ public class UntypedActorSwapper {
@Override
public void apply(Object message) {
log.info("Ho");
getContext().unbecome(); // resets the latest 'become' (just for fun)
getContext().unbecome(); // resets the latest 'become'
}
});
}, false); // this signals stacking of the new behavior
} else {
unhandled(message);
}

View file

@ -13,7 +13,7 @@ import java.util.Map;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.Function;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
import akka.event.Logging;
import akka.event.LoggingAdapter;
@ -22,7 +22,11 @@ import com.typesafe.config.ConfigFactory;
import static akka.japi.Util.classTag;
import static akka.actor.SupervisorStrategy.*;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
@ -62,7 +66,7 @@ public class FaultHandlingDocSample {
public void preStart() {
// If we don't get any progress within 15 seconds then the service
// is unavailable
getContext().setReceiveTimeout(Duration.parse("15 seconds"));
getContext().setReceiveTimeout(Duration.create("15 seconds"));
}
public void onReceive(Object msg) {
@ -237,7 +241,7 @@ public class FaultHandlingDocSample {
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
private static SupervisorStrategy strategy = new OneForOneStrategy(3,
Duration.parse("5 seconds"), new Function<Throwable, Directive>() {
Duration.create("5 seconds"), new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof StorageException) {

View file

@ -8,8 +8,8 @@ package docs.camel;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.util.Timeout;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static java.util.concurrent.TimeUnit.SECONDS;
//#CamelActivation

View file

@ -2,8 +2,8 @@ package docs.camel;
//#Consumer4
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;

View file

@ -5,10 +5,6 @@ package docs.dispatcher;
//#imports
import akka.actor.*;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
//#imports
//#imports-prio
@ -37,6 +33,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.Option;
import scala.concurrent.ExecutionContext;
import com.typesafe.config.ConfigFactory;
@ -75,6 +72,14 @@ public class DispatcherDocTestBase {
.withDispatcher("my-pinned-dispatcher"));
//#defining-pinned-dispatcher
}
public void compileLookup() {
//#lookup
// this is scala.concurrent.ExecutionContext
// for use with Futures, Scheduler, etc.
final ExecutionContext ex = system.dispatchers().lookup("my-dispatcher");
//#lookup
}
@Test
public void priorityDispatcher() throws Exception {

View file

@ -119,5 +119,4 @@ public class LoggingDocTestBase {
}
}
//#deadletter-actor
}

View file

@ -9,7 +9,7 @@ import akka.actor.AbstractExtensionId;
import akka.actor.ExtensionIdProvider;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;

View file

@ -12,7 +12,7 @@ import akka.util.Timeout;
//#imports1
//#imports2
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.japi.Function;
import java.util.concurrent.Callable;
import static akka.dispatch.Futures.future;
@ -43,10 +43,10 @@ import scala.concurrent.ExecutionContext$;
//#imports8
import static akka.pattern.Patterns.after;
import java.util.Arrays;
//#imports8
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -79,6 +79,21 @@ public class FutureDocTestBase {
system.shutdown();
}
public final static class PrintResult<T> extends OnSuccess<T> {
@Override public final void onSuccess(T t) {
// print t
}
}
public final static class Demo {
//#print-result
public final static class PrintResult<T> extends OnSuccess<T> {
@Override public final void onSuccess(T t) {
System.out.println(t);
}
}
//#print-result
}
@SuppressWarnings("unchecked") @Test public void useCustomExecutionContext() throws Exception {
ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor();
//#diy-execution-context
@ -102,6 +117,9 @@ public class FutureDocTestBase {
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
//#ask-blocking
//#pipe-to
akka.pattern.Patterns.pipe(future, system.dispatcher()).to(actor);
//#pipe-to
assertEquals("HELLO", result);
}
@ -113,9 +131,11 @@ public class FutureDocTestBase {
return "Hello" + "World";
}
}, system.dispatcher());
String result = (String) Await.result(f, Duration.create(1, SECONDS));
f.onSuccess(new PrintResult<String>(), system.dispatcher());
//#future-eval
assertEquals("HelloWorld", result);
String result = (String) Await.result(f, Duration.create(5, SECONDS));
assertEquals("HelloWorld", result);
}
@Test
@ -135,9 +155,10 @@ public class FutureDocTestBase {
}
}, ec);
int result = Await.result(f2, Duration.create(1, SECONDS));
assertEquals(10, result);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
//#map
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@Test
@ -158,8 +179,9 @@ public class FutureDocTestBase {
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
//#map2
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@ -174,7 +196,8 @@ public class FutureDocTestBase {
}
}, ec);
Thread.sleep(100);
// Thread.sleep is only here to prove a point
Thread.sleep(100); // Do not use this in your code
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
@ -182,8 +205,9 @@ public class FutureDocTestBase {
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
//#map3
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@ -208,8 +232,9 @@ public class FutureDocTestBase {
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
//#flat-map
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@ -238,8 +263,9 @@ public class FutureDocTestBase {
}
}, ec);
long result = Await.result(futureSum, Duration.create(1, SECONDS));
futureSum.onSuccess(new PrintResult<Long>(), system.dispatcher());
//#sequence
long result = Await.result(futureSum, Duration.create(5, SECONDS));
assertEquals(3L, result);
}
@ -262,9 +288,10 @@ public class FutureDocTestBase {
}, ec);
//Returns the sequence of strings as upper case
Iterable<String> result = Await.result(futureResult, Duration.create(1, SECONDS));
assertEquals(Arrays.asList("A", "B", "C"), result);
futureResult.onSuccess(new PrintResult<Iterable<String>>(), system.dispatcher());
//#traverse
Iterable<String> result = Await.result(futureResult, Duration.create(5, SECONDS));
assertEquals(Arrays.asList("A", "B", "C"), result);
}
@Test
@ -286,9 +313,10 @@ public class FutureDocTestBase {
return r + t; //Just concatenate
}
}, ec);
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#fold
resultFuture.onSuccess(new PrintResult<String>(), system.dispatcher());
//#fold
String result = Await.result(resultFuture, Duration.create(5, SECONDS));
assertEquals("ab", result);
}
@ -310,8 +338,9 @@ public class FutureDocTestBase {
}
}, ec);
Object result = Await.result(resultFuture, Duration.create(1, SECONDS));
resultFuture.onSuccess(new PrintResult<Object>(), system.dispatcher());
//#reduce
Object result = Await.result(resultFuture, Duration.create(5, SECONDS));
assertEquals("ab", result);
}
@ -326,10 +355,10 @@ public class FutureDocTestBase {
Future<String> otherFuture = Futures.failed(
new IllegalArgumentException("Bang!"));
//#failed
Object result = Await.result(future, Duration.create(1, SECONDS));
Object result = Await.result(future, Duration.create(5, SECONDS));
assertEquals("Yay!", result);
Throwable result2 = Await.result(otherFuture.failed(),
Duration.create(1, SECONDS));
Duration.create(5, SECONDS));
assertEquals("Bang!", result2.getMessage());
}
@ -399,9 +428,11 @@ public class FutureDocTestBase {
throw problem;
}
}, ec);
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
future.onSuccess(new PrintResult<Integer>(), system.dispatcher());
//#recover
int result = Await.result(future, Duration.create(5, SECONDS));
assertEquals(result, 0);
}
@Test
@ -425,9 +456,11 @@ public class FutureDocTestBase {
throw problem;
}
}, ec);
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
future.onSuccess(new PrintResult<Integer>(), system.dispatcher());
//#try-recover
int result = Await.result(future, Duration.create(5, SECONDS));
assertEquals(result, 0);
}
@Test
@ -497,9 +530,10 @@ public class FutureDocTestBase {
}
}, ec);
String result = Await.result(future3, Duration.create(1, SECONDS));
assertEquals("foo bar", result);
future3.onSuccess(new PrintResult<String>(), system.dispatcher());
//#zip
String result = Await.result(future3, Duration.create(5, SECONDS));
assertEquals("foo bar", result);
}
{
@ -509,9 +543,10 @@ public class FutureDocTestBase {
Future<String> future3 = Futures.successful("bar");
// Will have "bar" in this case
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3);
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);
future4.onSuccess(new PrintResult<String>(), system.dispatcher());
//#fallback-to
String result = Await.result(future4, Duration.create(5, SECONDS));
assertEquals("bar", result);
}
}
@ -529,7 +564,7 @@ public class FutureDocTestBase {
return "foo";
}
}, ec);
Future<String> result = future.either(delayed);
Future<String> result = Futures.firstCompletedOf(Arrays.asList(future, delayed), ec);
//#after
Await.result(result, Duration.create(2, SECONDS));
}

View file

@ -11,6 +11,7 @@ import static docs.jrouting.CustomRouterDocTestBase.Message.RepublicanVote;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.After;
@ -19,7 +20,7 @@ import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.OneForOneStrategy;
@ -68,8 +69,8 @@ public class CustomRouterDocTestBase {
public void demonstrateSupervisor() {
//#supervision
final SupervisorStrategy strategy =
new OneForOneStrategy(5, Duration.parse("1 minute"),
new Class<?>[] { Exception.class });
new OneForOneStrategy(5, Duration.create("1 minute"),
Collections.<Class<? extends Throwable>>singletonList(Exception.class));
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy)));
//#supervision
@ -179,16 +180,14 @@ public class CustomRouterDocTestBase {
//#crRoutingLogic
return new CustomRoute() {
@Override
public Iterable<Destination> destinationsFor(ActorRef sender, Object msg) {
public scala.collection.immutable.Seq<Destination> destinationsFor(ActorRef sender, Object msg) {
switch ((Message) msg) {
case DemocratVote:
case DemocratCountResult:
return Arrays.asList(
new Destination[] { new Destination(sender, democratActor) });
return akka.japi.Util.immutableSingletonSeq(new Destination(sender, democratActor));
case RepublicanVote:
case RepublicanCountResult:
return Arrays.asList(
new Destination[] { new Destination(sender, republicanActor) });
return akka.japi.Util.immutableSingletonSeq(new Destination(sender, republicanActor));
default:
throw new IllegalArgumentException("Unknown message: " + msg);
}

View file

@ -11,7 +11,7 @@ import akka.routing.SmallestMailboxRouter;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
import scala.concurrent.Future;
import scala.concurrent.Await;

View file

@ -70,7 +70,7 @@ public class RouterViaProgramExample {
int upperBound = 15;
DefaultResizer resizer = new DefaultResizer(lowerBound, upperBound);
ActorRef router3 = system.actorOf(
new Props(ExampleActor.class).withRouter(new RoundRobinRouter(nrOfInstances)));
new Props(ExampleActor.class).withRouter(new RoundRobinRouter(resizer)));
//#programmaticRoutingWithResizer
for (int i = 1; i <= 6; i++) {
router3.tell(new ExampleActor.Message(i), null);

View file

@ -0,0 +1,191 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.pattern;
import akka.actor.*;
import akka.testkit.*;
import akka.testkit.TestEvent.Mute;
import akka.testkit.TestEvent.UnMute;
import org.junit.*;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class SchedulerPatternTest {
static ActorSystem system;
@BeforeClass
public static void setUp() {
system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf());
}
@AfterClass
public static void tearDown() {
system.shutdown();
}
static
//#schedule-constructor
public class ScheduleInConstructor extends UntypedActor {
private final Cancellable tick = getContext().system().scheduler().schedule(
Duration.create(500, TimeUnit.MILLISECONDS),
Duration.create(1000, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
//#schedule-constructor
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
public ScheduleInConstructor(ActorRef target) {
this.target = target;
}
//#schedule-constructor
@Override
public void postStop() {
tick.cancel();
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("tick")) {
// do something useful here
//#schedule-constructor
target.tell(message, getSelf());
//#schedule-constructor
}
//#schedule-constructor
else if (message.equals("restart")) {
throw new ArithmeticException();
}
//#schedule-constructor
else {
unhandled(message);
}
}
}
//#schedule-constructor
static
//#schedule-receive
public class ScheduleInReceive extends UntypedActor {
//#schedule-receive
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
public ScheduleInReceive(ActorRef target) {
this.target = target;
}
//#schedule-receive
@Override
public void preStart() {
getContext().system().scheduler().scheduleOnce(
Duration.create(500, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
}
// override postRestart so we don't call preStart and schedule a new message
@Override
public void postRestart(Throwable reason) {
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("tick")) {
// send another periodic tick after the specified delay
getContext().system().scheduler().scheduleOnce(
Duration.create(1000, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
// do something useful here
//#schedule-receive
target.tell(message, getSelf());
//#schedule-receive
}
//#schedule-receive
else if (message.equals("restart")) {
throw new ArithmeticException();
}
//#schedule-receive
else {
unhandled(message);
}
}
}
//#schedule-receive
@Test
@Ignore // no way to tag this as timing sensitive
public void scheduleInConstructor() {
new TestSchedule(system) {{
final JavaTestKit probe = new JavaTestKit(system);
final Props props = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ScheduleInConstructor(probe.getRef());
}
});
testSchedule(probe, props, duration("3000 millis"), duration("2000 millis"));
}};
}
@Test
@Ignore // no way to tag this as timing sensitive
public void scheduleInReceive() {
new TestSchedule(system) {{
final JavaTestKit probe = new JavaTestKit(system);
final Props props = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ScheduleInReceive(probe.getRef());
}
});
testSchedule(probe, props, duration("3000 millis"), duration("2500 millis"));
}};
}
public static class TestSchedule extends JavaTestKit {
private ActorSystem system;
public TestSchedule(ActorSystem system) {
super(system);
this.system = system;
}
public void testSchedule(final JavaTestKit probe, Props props,
FiniteDuration startDuration,
FiniteDuration afterRestartDuration) {
Iterable<akka.testkit.EventFilter> filter =
Arrays.asList(new akka.testkit.EventFilter[]{
(akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)});
try {
system.eventStream().publish(new Mute(filter));
final ActorRef actor = system.actorOf(props);
new Within(startDuration) {
protected void run() {
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
}
};
actor.tell("restart", getRef());
new Within(afterRestartDuration) {
protected void run() {
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
}
};
system.stop(actor);
}
finally {
system.eventStream().publish(new UnMute(filter));
}
}
}
}

View file

@ -7,6 +7,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.typesafe.config.ConfigFactory;
//#import
import akka.actor.ActorRef;
import akka.actor.Address;
@ -60,6 +62,14 @@ public class RemoteDeploymentDocTestBase {
actor.tell("Pretty slick", null);
//#sample-actor
}
@Test
public void demonstrateProgrammaticConfig() {
//#programmatic
ConfigFactory.parseString("akka.remote.netty.hostname=\"1.2.3.4\"")
.withFallback(ConfigFactory.load());
//#programmatic
}
}

View file

@ -138,12 +138,7 @@ public class SerializationDocTestBase {
}
public Address getAddress() {
final ActorRefProvider provider = system.provider();
if (provider instanceof RemoteActorRefProvider) {
return ((RemoteActorRefProvider) provider).transport().address();
} else {
throw new UnsupportedOperationException("need RemoteActorRefProvider");
}
return system.provider().getDefaultAddress();
}
}

View file

@ -26,7 +26,7 @@ import akka.testkit.TestActor;
import akka.testkit.TestActor.AutoPilot;
import akka.testkit.TestActorRef;
import akka.testkit.JavaTestKit;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
public class TestKitDocTest {

View file

@ -14,7 +14,7 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.JavaTestKit;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
public class TestKitSampleTest {

View file

@ -30,7 +30,7 @@ import akka.actor.UntypedActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.serialization.SerializationExtension;
import akka.serialization.Serialization;
import java.io.Serializable;
@ -99,6 +99,9 @@ public class ZeromqDocTestBase {
pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload)), null);
//#pub-topic
system.stop(subSocket);
system.stop(subTopicSocket);
//#high-watermark
ActorRef highWatermarkSocket = ZeroMQExtension.get(system).newRouterSocket(
new SocketOption[] { new Listener(listener),

View file

@ -13,6 +13,15 @@ Default dispatcher
Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases.
.. _dispatcher-lookup-java:
Looking up a Dispatcher
-----------------------
Dispatchers implement the :class:`ExecutionContext` interface and can thus be used to run :class:`Future` invocations etc.
.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#lookup
Setting the dispatcher for an Actor
-----------------------------------

View file

@ -185,7 +185,7 @@ at runtime::
system.eventStream.setLogLevel(Logging.DebugLevel());
This means that log events for a level which will not be logged are not
This means that log events for a level which will not be logged are
typically not dispatched at all (unless manual subscriptions to the respective
event class have been done)

View file

@ -24,9 +24,6 @@ sample as it is easy to follow the log output to understand what is happening in
fault-tolerance-sample
.. includecode:: code/docs/actor/japi/FaultHandlingDocSample.java#all
:exclude: imports,messages,dummydb
Creating a Supervisor Strategy
------------------------------

View file

@ -47,6 +47,17 @@ Alternatives to blocking are discussed further within this documentation.
Also note that the ``Future`` returned by an ``UntypedActor`` is a ``Future<Object>`` since an ``UntypedActor`` is dynamic.
That is why the cast to ``String`` is used in the above sample.
.. warning::
``Await.result`` and ``Await.ready`` are provided for exceptional situations where you **must** block,
a good rule of thumb is to only use them if you know why you **must** block. For all other cases, use
asynchronous composition as described below.
To send the result of a ``Future`` to an ``Actor``, you can use the ``pipe`` construct:
.. includecode:: code/docs/future/FutureDocTestBase.java
:include: pipe-to
Use Directly
------------
@ -75,6 +86,11 @@ Or failures:
.. includecode:: code/docs/future/FutureDocTestBase.java
:include: failed
For these examples ``PrintResult`` is defined as follows:
.. includecode:: code/docs/future/FutureDocTestBase.java
:include: print-result
Functional Futures
------------------

View file

@ -16,6 +16,37 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library
You might find some of the patterns described in the Scala chapter of
:ref:`howto-scala` useful even though the example code is written in Scala.
Scheduling Periodic Messages
============================
This pattern describes how to schedule periodic messages to yourself in two different
ways.
The first way is to set up periodic message scheduling in the constructor of the actor,
and cancel that scheduled sending in ``postStop`` or else we might have multiple registered
message sends to the same actor.
.. note::
With this approach the scheduled periodic message send will be restarted with the actor on restarts.
This also means that the time period that elapses between two tick messages during a restart may drift
off based on when you restart the scheduled message sends relative to the time that the last message was
sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``.
.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-constructor
The second variant sets up an initial one shot message send in the ``preStart`` method
of the actor, and the then the actor when it receives this message sets up a new one shot
message send. You also have to override ``postRestart`` so we don't call ``preStart``
and schedule the initial message send again.
.. note::
With this approach we won't fill up the mailbox with tick messages if the actor is
under pressure, but only schedule a new tick message when we have seen the previous one.
.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-receive
Single-Use Actor Trees with High-Level Error Reporting
======================================================
@ -69,4 +100,3 @@ This is an especially nice pattern, since it does even come with some empty exam
Spread the word: this is the easiest way to get famous!
Please keep this pattern at the end of this file.

View file

@ -194,8 +194,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.4</version>
<scope>runtime</scope>
<version>1.0.7</version>
</dependency>
You need to enable the Slf4jEventHandler in the 'event-handlers' element in

View file

@ -10,7 +10,7 @@ having to create a launcher script.
The Akka Microkernel is included in the Akka download found at `downloads`_.
.. _downloads: http://akka.io/downloads
.. _downloads: http://typesafe.com/stack/downloads/akka
To run an application with the microkernel you need to create a Bootable class
that handles the startup and shutdown the application. An example is included below.
@ -19,11 +19,7 @@ Put your application jar in the ``deploy`` directory to have it automatically
loaded.
To start the kernel use the scripts in the ``bin`` directory, passing the boot
classes for your application.
There is a simple example of an application setup for running with the
microkernel included in the akka download. This can be run with the following
command (on a unix-based system):
classes for your application. Example command (on a unix-based system):
.. code-block:: none

View file

@ -60,6 +60,13 @@ reference file for more information:
.. literalinclude:: ../../../akka-remote/src/main/resources/reference.conf
:language: none
.. note::
Setting properties like the listening IP and port number programmatically is
best done by using something like the following:
.. includecode:: code/docs/remoting/RemoteDeploymentDocTestBase.java#programmatic
Looking up Remote Actors
^^^^^^^^^^^^^^^^^^^^^^^^
@ -115,6 +122,15 @@ actor systems has to have a JAR containing the class.
object, which in most cases is not serializable. It is best to make a static
inner class which implements :class:`UntypedActorFactory`.
.. note::
You can use asterisks as wildcard matches for the actor path sections, so you could specify:
``/*/sampleActor`` and that would match all ``sampleActor`` on that level in the hierarchy.
You can also use wildcard in the last position to match all actors at a certain level:
``/someParent/*``. Non-wildcard matches always have higher priority to match than wildcards, so:
``/foo/bar`` is considered **more specific** than ``/foo/*`` and only the highest priority match is used.
Please note that it **cannot** be used to partially match section, like this: ``/foo*/bar``, ``/f*o/bar`` etc.
.. warning::
*Caveat:* Remote deployment ties both systems together in a tight fashion,
@ -186,7 +202,7 @@ Description of the Remoting Sample
There is a more extensive remote example that comes with the Akka distribution.
Please have a look here for more information: `Remote Sample
<https://github.com/akka/akka/tree/master/akka-samples/akka-sample-remote>`_
<@github@/akka-samples/akka-sample-remote>`_
This sample demonstrates both, remote deployment and look-up of remote actors.
First, let us have a look at the common setup for both scenarios (this is
``common.conf``):

View file

@ -66,7 +66,7 @@ In addition to being able to supply looked-up remote actors as routees, you can
make the router deploy its created children on a set of remote hosts; this will
be done in round-robin fashion. In order to do that, wrap the router
configuration in a :class:`RemoteRouterConfig`, attaching the remote addresses of
the nodes to deploy to. Naturally, this requires your to include the
the nodes to deploy to. Naturally, this requires you to include the
``akka-remote`` module on your classpath:
.. includecode:: code/docs/jrouting/RouterViaProgramExample.java#remoteRoutees
@ -114,7 +114,7 @@ Routers vs. Supervision
^^^^^^^^^^^^^^^^^^^^^^^
As explained in the previous section, routers create new actor instances as
children of the “head” router, who therefor also is their supervisor. The
children of the “head” router, who therefore also is their supervisor. The
supervisor strategy of this actor can be configured by means of the
:meth:`RouterConfig.supervisorStrategy` property, which is supported for all
built-in router types. It defaults to “always escalate”, which leads to the
@ -434,7 +434,7 @@ Configured Custom Router
It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment
configuration you define the fully qualified class name of the router class. The router class must extend
``akka.routing.CustomRouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter.
``akka.routing.CustomRouterConfig`` and have constructor with one ``com.typesafe.config.Config`` parameter.
The deployment section of the configuration is passed to the constructor.
Custom Resizer

View file

@ -149,16 +149,12 @@ concrete address handy you can create a dummy one for the right protocol using
``new Address(protocol, "", "", 0)`` (assuming that the actual transport used is as
lenient as Akkas RemoteActorRefProvider).
There is a possible simplification available if you are just using the default
:class:`NettyRemoteTransport` with the :meth:`RemoteActorRefProvider`, which is
enabled by the fact that this combination has just a single remote address:
There is also a default remote address which is the one used by cluster support
(and typical systems have just this one); you can get it like this:
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
:include: external-address-default
This solution has to be adapted once other providers are used (like the planned
extensions for clustering).
Deep serialization of Actors
----------------------------

View file

@ -127,10 +127,11 @@ UntypedActor API
The :class:`UntypedActor` class defines only one abstract method, the above mentioned
:meth:`onReceive(Object message)`, which implements the behavior of the actor.
If the current actor behavior does not match a received message,
:meth:`unhandled` is called, which by default publishes a ``new
If the current actor behavior does not match a received message, it's recommended that
you call the :meth:`unhandled` method, which by default publishes a ``new
akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor systems
event stream.
event stream (set configuration item ``akka.actor.debug.unhandled`` to ``on``
to have them converted into actual Debug messages).
In addition, it offers:
@ -431,13 +432,20 @@ defaults to a 'dead-letter' actor ref.
getSender().tell(result); // will have dead-letter actor as default
}
Initial receive timeout
=======================
Receive timeout
===============
A timeout mechanism can be used to receive a message when no initial message is
received within a certain time. To receive this timeout you have to set the
``receiveTimeout`` property and declare handing for the ReceiveTimeout
message.
The `UntypedActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which
the sending of a `ReceiveTimeout` message is triggered.
When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message.
1 millisecond is the minimum supported timeout.
Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after
another message was enqueued; hence it is **not guaranteed** that upon reception of the receive
timeout there must have been an idle period beforehand as configured via this method.
Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity
periods). Pass in `Duration.Undefined` to switch off this feature.
.. includecode:: code/docs/actor/MyReceivedTimeoutUntypedActor.java#receive-timeout
@ -542,7 +550,8 @@ Upgrade
Akka supports hotswapping the Actors message loop (e.g. its implementation) at
runtime. Use the ``getContext().become`` method from within the Actor.
The hotswapped code is kept in a Stack which can be pushed and popped.
The hotswapped code is kept in a Stack which can be pushed (replacing or adding
at the top) and popped.
.. warning::
@ -556,26 +565,19 @@ To hotswap the Actor using ``getContext().become``:
.. includecode:: code/docs/actor/UntypedActorDocTestBase.java
:include: hot-swap-actor
The ``become`` method is useful for many different things, such as to implement
a Finite State Machine (FSM).
This variant of the :meth:`become` method is useful for many different things,
such as to implement a Finite State Machine (FSM). It will replace the current
behavior (i.e. the top of the behavior stack), which means that you do not use
:meth:`unbecome`, instead always the next behavior is explicitly installed.
Here is another little cute example of ``become`` and ``unbecome`` in action:
The other way of using :meth:`become` does not replace but add to the top of
the behavior stack. In this case care must be taken to ensure that the number
of “pop” operations (i.e. :meth:`unbecome`) matches the number of “push” ones
in the long run, otherwise this amounts to a memory leak (which is why this
behavior is not the default).
.. includecode:: code/docs/actor/UntypedActorSwapper.java#swapper
Downgrade
---------
Since the hotswapped code is pushed to a Stack you can downgrade the code as
well. Use the ``getContext().unbecome`` method from within the Actor.
.. code-block:: java
public void onReceive(Object message) {
if (message.equals("revert")) getContext().unbecome();
}
Stash
=====
@ -620,9 +622,11 @@ The stash is backed by a ``scala.collection.immutable.Vector``. As a
result, even a very large number of messages may be stashed without a
major impact on performance.
Note that the stash is not persisted across restarts of an actor,
unlike the actor's mailbox. Therefore, it should be managed like other
parts of the actor's state which have the same property.
Note that the stash is part of the ephemeral actor state, unlike the
mailbox. Therefore, it should be managed like other parts of the
actor's state which have the same property. The :class:`Stash` traits
implementation of :meth:`preRestart` will call ``unstashAll()``, which is
usually the desired behavior.
Killing an Actor