Fix merge issues

This commit is contained in:
phaller 2012-02-24 16:32:00 +01:00
commit 601ef17f54
140 changed files with 2372 additions and 1308 deletions

View file

@ -39,7 +39,7 @@ public class JavaFutureTests {
}
@Test
public void mustBeAbleToMapAFuture() {
public void mustBeAbleToMapAFuture() throws Exception {
Future<String> f1 = Futures.future(new Callable<String>() {
public String call() {
@ -163,7 +163,7 @@ public class JavaFutureTests {
// TODO: Improve this test, perhaps with an Actor
@Test
public void mustSequenceAFutureList() {
public void mustSequenceAFutureList() throws Exception{
LinkedList<Future<String>> listFutures = new LinkedList<Future<String>>();
LinkedList<String> listExpected = new LinkedList<String>();
@ -183,7 +183,7 @@ public class JavaFutureTests {
// TODO: Improve this test, perhaps with an Actor
@Test
public void foldForJavaApiMustWork() {
public void foldForJavaApiMustWork() throws Exception{
LinkedList<Future<String>> listFutures = new LinkedList<Future<String>>();
StringBuilder expected = new StringBuilder();
@ -206,7 +206,7 @@ public class JavaFutureTests {
}
@Test
public void reduceForJavaApiMustWork() {
public void reduceForJavaApiMustWork() throws Exception{
LinkedList<Future<String>> listFutures = new LinkedList<Future<String>>();
StringBuilder expected = new StringBuilder();
@ -229,7 +229,7 @@ public class JavaFutureTests {
}
@Test
public void traverseForJavaApiMustWork() {
public void traverseForJavaApiMustWork() throws Exception{
LinkedList<String> listStrings = new LinkedList<String>();
LinkedList<String> expectedStrings = new LinkedList<String>();
@ -252,7 +252,7 @@ public class JavaFutureTests {
}
@Test
public void findForJavaApiMustWork() {
public void findForJavaApiMustWork() throws Exception{
LinkedList<Future<Integer>> listFutures = new LinkedList<Future<Integer>>();
for (int i = 0; i < 10; i++) {
final Integer fi = i;
@ -273,7 +273,7 @@ public class JavaFutureTests {
}
@Test
public void blockMustBeCallable() {
public void blockMustBeCallable() throws Exception {
Promise<String> p = Futures.promise(system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.success("foo");
@ -282,7 +282,7 @@ public class JavaFutureTests {
}
@Test
public void mapToMustBeCallable() {
public void mapToMustBeCallable() throws Exception {
Promise<Object> p = Futures.promise(system.dispatcher());
Future<String> f = p.future().mapTo(manifest(String.class));
Duration d = Duration.create(1, TimeUnit.SECONDS);
@ -292,7 +292,7 @@ public class JavaFutureTests {
}
@Test
public void recoverToMustBeCallable() {
public void recoverToMustBeCallable() throws Exception {
final IllegalStateException fail = new IllegalStateException("OHNOES");
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recover(new Recover<Object>() {
@ -309,7 +309,7 @@ public class JavaFutureTests {
}
@Test
public void recoverWithToMustBeCallable() {
public void recoverWithToMustBeCallable() throws Exception{
final IllegalStateException fail = new IllegalStateException("OHNOES");
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recoverWith(new Recover<Future<Object>>() {

View file

@ -74,6 +74,14 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
callbackWasRun must be(true)
}
"return isTerminated status correctly" in {
val system = ActorSystem()
system.isTerminated must be(false)
system.shutdown()
system.awaitTermination()
system.isTerminated must be(true)
}
"throw RejectedExecutionException when shutdown" in {
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
system2.shutdown()

View file

@ -66,15 +66,11 @@ object ActorWithStashSpec {
var expectedException: TestLatch = null
}
val testConf: Config = ConfigFactory.parseString("""
akka {
actor {
default-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
}
}
""")
val testConf = """
my-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
"""
}
@ -88,14 +84,14 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
}
override def beforeEach() = {
state.finished.reset
}
override def beforeEach() = state.finished.reset
def myProps(creator: Actor): Props = Props(creator).withDispatcher("my-dispatcher")
"An Actor with Stash" must {
"stash messages" in {
val stasher = system.actorOf(Props(new StashingActor))
val stasher = system.actorOf(myProps(new StashingActor))
stasher ! "bye"
stasher ! "hello"
state.finished.await
@ -103,7 +99,7 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa
}
"support protocols" in {
val protoActor = system.actorOf(Props(new ActorWithProtocol))
val protoActor = system.actorOf(myProps(new ActorWithProtocol))
protoActor ! "open"
protoActor ! "write"
protoActor ! "open"
@ -116,20 +112,20 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa
"throw an IllegalStateException if the same messages is stashed twice" in {
state.expectedException = new TestLatch
val stasher = system.actorOf(Props(new StashingTwiceActor))
val stasher = system.actorOf(myProps(new StashingTwiceActor))
stasher ! "hello"
stasher ! "hello"
Await.ready(state.expectedException, 10 seconds)
}
"process stashed messages after restart" in {
val boss = system.actorOf(Props(new Supervisor(
val boss = system.actorOf(myProps(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
val restartLatch = new TestLatch
val hasMsgLatch = new TestLatch
val slaveProps = Props(new Actor with Stash {
val slaveProps = myProps(new Actor with Stash {
protected def receive = {
case "crash"
throw new Exception("Crashing...")

View file

@ -60,5 +60,22 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
assert(Await.result(actor4 ? "status", timeout.duration) == "OK", "actor4 is shutdown")
}
}
"be able to create named children in its constructor" in {
val a = system.actorOf(Props(new Actor {
context.actorOf(Props.empty, "bob")
def receive = {
case x: Exception throw x
}
override def preStart(): Unit = testActor ! "preStart"
}))
val m = "weird message"
EventFilter[Exception](m, occurrences = 1) intercept {
a ! new Exception(m)
}
expectMsg("preStart")
expectMsg("preStart")
a.isTerminated must be(false)
}
}
}

View file

@ -12,13 +12,13 @@ import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
import akka.serialization.SerializationExtension
import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.japi.{ Creator, Option JOption }
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.actor.TypedActor._
object TypedActorSpec {
@ -160,7 +160,7 @@ object TypedActorSpec {
def crash(): Unit
}
class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles {
class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver {
override def crash(): Unit = throw new IllegalStateException("Crash!")
@ -171,6 +171,12 @@ object TypedActorSpec {
override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i 1 to 5) latch.countDown()
override def postRestart(reason: Throwable): Unit = for (i 1 to 7) latch.countDown()
override def onReceive(msg: Any, sender: ActorRef): Unit = {
msg match {
case "pigdog" sender ! "dogpig"
}
}
}
}
@ -415,6 +421,16 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept {
t.crash()
}
//Sneak in a check for the Receiver override
val ref = ta getActorRefFor t
ref.tell("pigdog", testActor)
expectMsg(timeout.duration, "dogpig")
//Done with that now
ta.poisonPill(t)
latch.await(10, TimeUnit.SECONDS) must be === true
}

View file

@ -6,15 +6,14 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.actor.ActorContext
import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
import com.typesafe.config.Config
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
def name: String
def factory: MailboxType Mailbox
def factory: MailboxType MessageQueue
name should {
"create an unbounded mailbox" in {
@ -77,7 +76,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system)
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
q must not be null
q match {
case aQueue: BlockingQueue[_]
@ -136,8 +135,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
case u: UnboundedMailbox u.create(null)
case b: BoundedMailbox b.create(null)
case u: UnboundedMailbox u.create(None)
case b: BoundedMailbox b.create(None)
}
}
@ -145,8 +144,8 @@ class PriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##)
lazy val name = "The priority mailbox implementation"
def factory = {
case UnboundedMailbox() UnboundedPriorityMailbox(comparator).create(null)
case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
case UnboundedMailbox() new UnboundedPriorityMailbox(comparator).create(None)
case BoundedMailbox(capacity, pushTimeOut) new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None)
}
}
@ -158,11 +157,13 @@ object CustomMailboxSpec {
"""
class MyMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new MyMailbox(owner)
override def create(owner: Option[ActorContext]) = owner match {
case Some(o) new MyMailbox(o)
case None throw new Exception("no mailbox owner given")
}
}
class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
class MyMailbox(owner: ActorContext) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@ -171,8 +172,9 @@ object CustomMailboxSpec {
class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
"Dispatcher configuration" must {
"support custom mailboxType" in {
val dispatcher = system.dispatchers.lookup("my-dispatcher")
dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox])
val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher"))
val queue = actor.asInstanceOf[LocalActorRef].underlying.mailbox.messageQueue
queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox])
}
}
}

View file

@ -56,6 +56,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
"decorate a Receive" in {
new TestKit(appLogging) {
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[UnhandledMessage])
val a = system.actorOf(Props(new Actor {
def receive = new LoggingReceive(Some("funky"), {
case null
@ -63,6 +64,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
}))
a ! "hallo"
expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo"))
expectMsgType[UnhandledMessage](1 second)
}
}

View file

@ -53,6 +53,7 @@ object RoutingSpec {
}
}
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
}
}
@ -126,6 +127,44 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
system.stop(router)
}
"set supplied supervisorStrategy" in {
//#supervision
val escalator = OneForOneStrategy() {
//#custom-strategy
case e testActor ! e; SupervisorStrategy.Escalate
//#custom-strategy
}
val router = system.actorOf(Props.empty.withRouter(
RoundRobinRouter(1, supervisorStrategy = escalator)))
//#supervision
router ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 2) intercept {
expectMsgType[RouterRoutees].routees.head ! Kill
}
expectMsgType[ActorKilledException]
}
"default to all-for-one-always-escalate strategy" in {
val restarter = OneForOneStrategy() {
case e testActor ! e; SupervisorStrategy.Restart
}
val supervisor = system.actorOf(Props(new Supervisor(restarter)))
supervisor ! Props(new Actor {
def receive = {
case x: String throw new Exception(x)
}
override def postRestart(reason: Throwable): Unit = testActor ! "restarted"
}).withRouter(RoundRobinRouter(3))
val router = expectMsgType[ActorRef]
EventFilter[Exception]("die", occurrences = 2) intercept {
router ! "die"
}
expectMsgType[Exception].getMessage must be("die")
expectMsg("restarted")
expectMsg("restarted")
expectMsg("restarted")
}
}
"no router" must {
@ -542,6 +581,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
case class VoteCountRouter() extends RouterConfig {
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
//#crRoute
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {

View file

@ -9,6 +9,7 @@ import com.typesafe.config.Config
import akka.dispatch.DispatcherPrerequisites
import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageDispatcherConfigurator
import akka.dispatch.UnboundedMailbox
object CallingThreadDispatcherModelSpec {
import ActorModelSpec._
@ -31,7 +32,7 @@ object CallingThreadDispatcherModelSpec {
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor {
new CallingThreadDispatcher(prerequisites, UnboundedMailbox()) with MessageDispatcherInterceptor {
override def id: String = config.getString("id")
}

File diff suppressed because it is too large Load diff

View file

@ -5,8 +5,6 @@
*/
package akka.jsr166y;
import akka.util.Unsafe;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@ -23,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.reflect.Constructor;
import akka.util.Unsafe;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@ -199,45 +198,37 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* methods in a way that flows well in javadocs.
*/
/**
* The number of times to try to help join a task without any
* apparent progress before giving up and blocking. The value is
* arbitrary but should be large enough to cope with transient
* stalls (due to GC etc) that can cause helping methods not to be
* able to proceed because other workers have not progressed to
* the point where subtasks can be found or taken.
*/
private static final int HELP_RETRIES = 32;
/*
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status holds value
* NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
* waits by other threads have the SIGNAL bit set. Completion of
* a stolen task with SIGNAL set awakens any waiters via
* notifyAll. Even though suboptimal for some purposes, we use
* basic builtin wait/notify to take advantage of "monitor
* inflation" in JVMs that we would otherwise need to emulate to
* avoid adding further per-task bookkeeping overhead. We want
* these monitors to be "fat", i.e., not use biasing or thin-lock
* techniques, so use some odd coding idioms that tend to avoid
* them.
* values until completed, upon which status (anded with
* DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
* undergoing blocking waits by other threads have the SIGNAL bit
* set. Completion of a stolen task with SIGNAL set awakens any
* waiters via notifyAll. Even though suboptimal for some
* purposes, we use basic builtin wait/notify to take advantage of
* "monitor inflation" in JVMs that we would otherwise need to
* emulate to avoid adding further per-task bookkeeping overhead.
* We want these monitors to be "fat", i.e., not use biasing or
* thin-lock techniques, so use some odd coding idioms that tend
* to avoid them, mainly by arranging that every synchronized
* block performs a wait, notifyAll or both.
*/
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0
static final int CANCELLED = 0xfffffff8; // must be < NORMAL
static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00000001;
static final int MARKED = 0x00000002;
/**
* Marks completion and wakes up threads waiting to join this
* task, also clearing signal request bits. A specialization for
* NORMAL completion is in method doExec.
* task. A specialization for NORMAL completion is in method
* doExec.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
@ -246,7 +237,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) {
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return completion;
@ -270,7 +261,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return setExceptionalCompletion(rex);
}
while ((s = status) >= 0 && completed) {
if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) {
if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) {
if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return NORMAL;
@ -280,47 +271,58 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return s;
}
/**
* Tries to set SIGNAL status. Used by ForkJoinPool. Other
* variants are directly incorporated into externalAwaitDone etc.
*
* @return true if successful
*/
final boolean trySetSignal() {
int s;
return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL);
}
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
boolean interrupted = false;
int s;
if ((s = status) >= 0) {
boolean interrupted = false;
synchronized (this) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
if (interrupted)
Thread.currentThread().interrupt();
return s;
}
/**
* Blocks a non-worker-thread until completion or interruption or timeout.
* Blocks a non-worker-thread until completion or interruption.
*/
private int externalInterruptibleAwaitDone(long millis)
throws InterruptedException {
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0) {
synchronized (this) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
wait(millis);
if (millis > 0L)
break;
}
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait();
else
notifyAll();
}
}
}
@ -331,80 +333,37 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to awaitJoin.
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
if ((s = status) >= 0) {
if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) || (s = doExec()) >= 0)
s = wt.pool.awaitJoin(w, this);
}
else
s = externalAwaitDone();
else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) || (s = doExec()) >= 0)
s = awaitJoin(w, wt.pool);
}
return s;
}
/**
* Helps and/or blocks until joined.
*
* @param w the joiner
* @param p the pool
* @return status upon completion
*/
private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) {
int s;
ForkJoinTask<?> prevJoin = w.currentJoin;
w.currentJoin = this;
for (int k = HELP_RETRIES; (s = status) >= 0;) {
if ((w.queueSize() > 0) ?
w.tryRemoveAndExec(this) : // self-help
p.tryHelpStealer(w, this)) // help process tasks
k = HELP_RETRIES; // reset if made progress
else if ((s = status) < 0) // recheck
break;
else if (--k > 0) {
if ((k & 3) == 1)
Thread.yield(); // occasionally yield
}
else if (k == 0)
p.tryPollForAndExec(w, this); // uncommon self-help case
else if (p.tryCompensate()) { // true if can block
try {
int ss = status;
if (ss >= 0 && // assert need signal
U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) {
synchronized (this) {
if (status >= 0) // block
wait();
}
}
} catch (InterruptedException ignore) {
} finally {
p.incrementActiveCount(); // re-activate
}
}
}
w.currentJoin = prevJoin;
return s;
}
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke() {
int s; Thread t;
int s; Thread t; ForkJoinWorkerThread wt;
if ((s = doExec()) >= 0) {
if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,
this);
else
s = externalAwaitDone();
else {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
s = awaitJoin(wt.workQueue, wt.pool);
}
}
return s;
}
@ -541,7 +500,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the exception, or null if none
*/
private Throwable getThrowableException() {
if (status != EXCEPTIONAL)
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
int h = System.identityHashCode(this);
ExceptionNode e;
@ -626,16 +585,14 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Report the result of invoke or join; called only upon
* non-normal return of internal versions.
* Throws exception, if any, associated with the given status.
*/
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
private void reportException(int s) {
Throwable ex = ((s == CANCELLED) ? new CancellationException() :
(s == EXCEPTIONAL) ? getThrowableException() :
null);
if (ex != null)
U.throwException(ex);
return getRawResult();
}
// public methods
@ -659,9 +616,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
ForkJoinWorkerThread wt;
(wt = (ForkJoinWorkerThread)Thread.currentThread()).
workQueue.push(this, wt.pool);
((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this);
return this;
}
@ -677,10 +632,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the computed result
*/
public final V join() {
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
@ -692,10 +647,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the computed result
*/
public final V invoke() {
if (doInvoke() != NORMAL)
return reportResult();
else
return getRawResult();
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
@ -722,9 +677,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
t1.invoke();
t2.join();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
/**
@ -861,7 +819,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
public final boolean isDone() {
@ -869,7 +827,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
public final boolean isCancelled() {
return status == CANCELLED;
return (status & DONE_MASK) == CANCELLED;
}
/**
@ -889,7 +847,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
return status == NORMAL;
return (status & DONE_MASK) == NORMAL;
}
/**
@ -900,7 +858,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
int s = status;
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
@ -962,9 +920,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone(0L);
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if (s == CANCELLED)
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
@ -987,52 +945,60 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// Messy in part because we measure in nanos, but wait in millis
int s; long millis, nanos;
Thread t = Thread.currentThread();
if (!(t instanceof ForkJoinWorkerThread)) {
if ((millis = unit.toMillis(timeout)) > 0L)
s = externalInterruptibleAwaitDone(millis);
else
s = status;
}
else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) {
long deadline = System.nanoTime() + nanos;
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
ForkJoinPool.WorkQueue w = wt.workQueue;
ForkJoinPool p = wt.pool;
if (w.tryUnpush(this))
doExec();
boolean blocking = false;
if (Thread.interrupted())
throw new InterruptedException();
// Messy in part because we measure in nanosecs, but wait in millisecs
int s; long ns, ms;
if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
long deadline = System.nanoTime() + ns;
ForkJoinPool p = null;
ForkJoinPool.WorkQueue w = null;
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
p = wt.pool;
w = wt.workQueue;
s = p.helpJoinOnce(w, this); // no retries on failure
}
boolean canBlock = false;
boolean interrupted = false;
try {
while ((s = status) >= 0) {
if (w.runState < 0)
if (w != null && w.runState < 0)
cancelIgnoringExceptions(this);
else if (!blocking)
blocking = p.tryCompensate();
else if (!canBlock) {
if (p == null || p.tryCompensate(this, null))
canBlock = true;
}
else {
millis = TimeUnit.NANOSECONDS.toMillis(nanos);
if (millis > 0L &&
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
try {
synchronized (this) {
if (status >= 0)
wait(millis);
synchronized (this) {
if (status >= 0) {
try {
wait(ms);
} catch (InterruptedException ie) {
if (p == null)
interrupted = true;
}
}
} catch (InterruptedException ie) {
else
notifyAll();
}
}
if ((s = status) < 0 ||
(nanos = deadline - System.nanoTime()) <= 0L)
if ((s = status) < 0 || interrupted ||
(ns = deadline - System.nanoTime()) <= 0L)
break;
}
}
} finally {
if (blocking)
if (p != null && canBlock)
p.incrementActiveCount();
}
if (interrupted)
throw new InterruptedException();
}
if (s != NORMAL) {
if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@ -1099,7 +1065,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* setRawResult(null)}.
*/
public void reinitialize() {
if (status == EXCEPTIONAL)
if ((status & DONE_MASK) == EXCEPTIONAL)
clearExceptionalCompletion();
else
status = 0;
@ -1387,21 +1353,33 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Runnable runnable;
final T resultOnCompletion;
T result;
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
this.resultOnCompletion = result;
this.result = result; // OK to set this even before completion
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
runnable.run();
result = resultOnCompletion;
return true;
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
public final boolean exec() { runnable.run(); return true; }
public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
/**
* Adaptor for Runnables without results
*/
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
implements RunnableFuture<Void> {
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
public void run() { invoke(); }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
@ -1416,9 +1394,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
public final boolean exec() {
try {
result = callable.call();
return true;
@ -1430,7 +1408,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
throw new RuntimeException(ex);
}
}
public void run() { invoke(); }
public final void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}
@ -1443,7 +1421,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the task
*/
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnable<Void>(runnable, null);
return new AdaptedRunnableAction(runnable);
}
/**

View file

@ -43,8 +43,8 @@ public class ForkJoinWorkerThread extends Thread {
if (ueh != null)
setUncaughtExceptionHandler(ueh);
this.pool = pool;
this.workQueue = new ForkJoinPool.WorkQueue(this, pool.localMode);
pool.registerWorker(this);
pool.registerWorker(this.workQueue = new ForkJoinPool.WorkQueue
(pool, this, pool.localMode));
}
/**
@ -101,7 +101,7 @@ public class ForkJoinWorkerThread extends Thread {
Throwable exception = null;
try {
onStart();
pool.runWorker(this);
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {

View file

@ -418,7 +418,7 @@ public interface Config extends ConfigMergeable {
* units suffixes like "10m" or "5ns" as documented in the <a
* href="https://github.com/typesafehub/config/blob/master/HOCON.md">the
* spec</a>.
*
*
* @param path
* path expression
* @return the duration value at the requested path, in milliseconds
@ -487,4 +487,23 @@ public interface Config extends ConfigMergeable {
List<Long> getMillisecondsList(String path);
List<Long> getNanosecondsList(String path);
/**
* Clone the config with only the given path (and its children) retained;
* all sibling paths are removed.
*
* @param path
* path to keep
* @return a copy of the config minus all paths except the one specified
*/
Config withOnlyPath(String path);
/**
* Clone the config with the given path removed.
*
* @param path
* path to remove
* @return a copy of the config minus the specified path
*/
Config withoutPath(String path);
}

View file

View file

View file

View file

View file

View file

View file

@ -91,4 +91,23 @@ public interface ConfigObject extends ConfigValue, Map<String, ConfigValue> {
*/
@Override
ConfigValue get(Object key);
/**
* Clone the object with only the given key (and its children) retained; all
* sibling keys are removed.
*
* @param key
* key to keep
* @return a copy of the object minus all keys except the one specified
*/
ConfigObject withOnlyKey(String key);
/**
* Clone the object with the given key removed.
*
* @param key
* key to remove
* @return a copy of the object minus the specified key
*/
ConfigObject withoutKey(String key);
}

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

@ -43,6 +43,18 @@ abstract class AbstractConfigObject extends AbstractConfigValue implements
return this;
}
@Override
abstract public AbstractConfigObject withOnlyKey(String key);
@Override
abstract public AbstractConfigObject withoutKey(String key);
abstract protected AbstractConfigObject withOnlyPathOrNull(Path path);
abstract AbstractConfigObject withOnlyPath(Path path);
abstract AbstractConfigObject withoutPath(Path path);
/**
* This looks up the key with no transformation or type conversion of any
* kind, and returns null if the key is not present.

View file

View file

@ -25,6 +25,8 @@ import com.typesafe.config.ConfigValueType;
final class ConfigDelayedMerge extends AbstractConfigValue implements
Unmergeable {
private static final long serialVersionUID = 1L;
// earlier items in the stack win
final private List<AbstractConfigValue> stack;
final private boolean ignoresFallbacks;

View file

@ -18,9 +18,11 @@ import com.typesafe.config.ConfigValue;
// This is just like ConfigDelayedMerge except we know statically
// that it will turn out to be an object.
class ConfigDelayedMergeObject extends AbstractConfigObject implements
final class ConfigDelayedMergeObject extends AbstractConfigObject implements
Unmergeable {
private static final long serialVersionUID = 1L;
final private List<AbstractConfigValue> stack;
final private boolean ignoresFallbacks;
@ -111,6 +113,31 @@ class ConfigDelayedMergeObject extends AbstractConfigObject implements
return (ConfigDelayedMergeObject) super.withFallback(mergeable);
}
@Override
public ConfigDelayedMergeObject withOnlyKey(String key) {
throw notResolved();
}
@Override
public ConfigDelayedMergeObject withoutKey(String key) {
throw notResolved();
}
@Override
protected AbstractConfigObject withOnlyPathOrNull(Path path) {
throw notResolved();
}
@Override
AbstractConfigObject withOnlyPath(Path path) {
throw notResolved();
}
@Override
AbstractConfigObject withoutPath(Path path) {
throw notResolved();
}
@Override
public Collection<AbstractConfigValue> unmergedValues() {
return stack;

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

View file

@ -826,4 +826,16 @@ final class SimpleConfig implements Config, MergeableValue, Serializable {
throw new ConfigException.ValidationFailed(problems);
}
}
@Override
public SimpleConfig withOnlyPath(String pathExpression) {
Path path = Path.newPath(pathExpression);
return new SimpleConfig(root().withOnlyPath(path));
}
@Override
public SimpleConfig withoutPath(String pathExpression) {
Path path = Path.newPath(pathExpression);
return new SimpleConfig(root().withoutPath(path));
}
}

View file

@ -18,6 +18,8 @@ import com.typesafe.config.ConfigValueType;
final class SimpleConfigList extends AbstractConfigValue implements ConfigList {
private static final long serialVersionUID = 1L;
final private List<AbstractConfigValue> value;
final private boolean resolved;

View file

@ -41,6 +41,83 @@ final class SimpleConfigObject extends AbstractConfigObject {
this(origin, value, ResolveStatus.fromValues(value.values()), false /* ignoresFallbacks */);
}
@Override
public SimpleConfigObject withOnlyKey(String key) {
return withOnlyPath(Path.newKey(key));
}
@Override
public SimpleConfigObject withoutKey(String key) {
return withoutPath(Path.newKey(key));
}
// gets the object with only the path if the path
// exists, otherwise null if it doesn't. this ensures
// that if we have { a : { b : 42 } } and do
// withOnlyPath("a.b.c") that we don't keep an empty
// "a" object.
@Override
protected SimpleConfigObject withOnlyPathOrNull(Path path) {
String key = path.first();
Path next = path.remainder();
AbstractConfigValue v = value.get(key);
if (next != null) {
if (v != null && (v instanceof AbstractConfigObject)) {
v = ((AbstractConfigObject) v).withOnlyPathOrNull(next);
} else {
// if the path has more elements but we don't have an object,
// then the rest of the path does not exist.
v = null;
}
}
if (v == null) {
return null;
} else {
return new SimpleConfigObject(origin(), Collections.singletonMap(key, v),
resolveStatus(), ignoresFallbacks);
}
}
@Override
SimpleConfigObject withOnlyPath(Path path) {
SimpleConfigObject o = withOnlyPathOrNull(path);
if (o == null) {
return new SimpleConfigObject(origin(),
Collections.<String, AbstractConfigValue> emptyMap(), resolveStatus(),
ignoresFallbacks);
} else {
return o;
}
}
@Override
SimpleConfigObject withoutPath(Path path) {
String key = path.first();
Path next = path.remainder();
AbstractConfigValue v = value.get(key);
if (v != null && next != null && v instanceof AbstractConfigObject) {
v = ((AbstractConfigObject) v).withoutPath(next);
Map<String, AbstractConfigValue> updated = new HashMap<String, AbstractConfigValue>(
value);
updated.put(key, v);
return new SimpleConfigObject(origin(), updated, resolveStatus(), ignoresFallbacks);
} else if (next != null || v == null) {
// can't descend, nothing to remove
return this;
} else {
Map<String, AbstractConfigValue> smaller = new HashMap<String, AbstractConfigValue>(
value.size() - 1);
for (Map.Entry<String, AbstractConfigValue> old : value.entrySet()) {
if (!old.getKey().equals(key))
smaller.put(old.getKey(), old.getValue());
}
return new SimpleConfigObject(origin(), smaller, resolveStatus(), ignoresFallbacks);
}
}
@Override
protected AbstractConfigValue peek(String key) {
return value.get(key);

View file

View file

View file

View file

View file

View file

View file

@ -2,7 +2,7 @@
# Akka Actor Reference Config File #
####################################
# This the reference config file has all the default settings.
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka {

View file

@ -93,7 +93,15 @@ trait ActorContext extends ActorRefFactory {
def sender: ActorRef
/**
* Returns all supervised children.
* Returns all supervised children; this method returns a view onto the
* internal collection of children. Targeted lookups should be using
* `actorFor` instead for performance reasons:
*
* {{{
* val badLookup = context.children find (_.path.name == "kid")
* // should better be expressed as:
* val goodLookup = context.actorFor("kid")
* }}}
*/
def children: Iterable[ActorRef]
@ -381,7 +389,6 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try {
val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
val freshActor = newActor()
if (failedActor ne null) {
val c = currentMessage //One read only plz
try {
@ -390,7 +397,8 @@ private[akka] class ActorCell(
clearActorFields()
}
}
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
val freshActor = newActor() // this must happen after failedActor.preRestart (to scrap those children)
actor = freshActor // this must happen before postRestart has a chance to fail
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))

View file

@ -380,6 +380,18 @@ class LocalActorRefProvider(
}
}
/**
* Overridable supervision strategy to be used by the /user guardian.
*/
protected def guardianSupervisionStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
/*
* Guardians can be asked by ActorSystem to create children, i.e. top-level
* actors. Therefore these need to answer to these requests, forwarding any
@ -387,14 +399,7 @@ class LocalActorRefProvider(
*/
private class Guardian extends Actor {
override val supervisorStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
override val supervisorStrategy = guardianSupervisionStrategy
def receive = {
case Terminated(_) context.stop(self)
@ -408,12 +413,27 @@ class LocalActorRefProvider(
override def preRestart(cause: Throwable, msg: Option[Any]) {}
}
/**
* Overridable supervision strategy to be used by the /system guardian.
*/
protected def systemGuardianSupervisionStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
/*
* Guardians can be asked by ActorSystem to create children, i.e. top-level
* actors. Therefore these need to answer to these requests, forwarding any
* exceptions which might have occurred.
*/
private class SystemGuardian extends Actor {
override val supervisorStrategy = systemGuardianSupervisionStrategy
def receive = {
case Terminated(_)
eventStream.stopDefaultLoggers()

View file

@ -182,12 +182,12 @@ abstract class ActorSystem extends ActorRefFactory {
/**
* Start-up time in milliseconds since the epoch.
*/
val startTime = System.currentTimeMillis
val startTime: Long = System.currentTimeMillis
/**
* Up-time of this actor system in seconds.
*/
def uptime = (System.currentTimeMillis - startTime) / 1000
def uptime: Long = (System.currentTimeMillis - startTime) / 1000
/**
* Main event bus of this actor system, used for example for logging.
@ -253,6 +253,8 @@ abstract class ActorSystem extends ActorRefFactory {
* Block current thread until the system has been shutdown, or the specified
* timeout has elapsed. This will block until after all on termination
* callbacks have been run.
*
* @throws TimeoutException in case of timeout
*/
def awaitTermination(timeout: Duration): Unit
@ -270,6 +272,15 @@ abstract class ActorSystem extends ActorRefFactory {
*/
def shutdown(): Unit
/**
* Query the termination status: if it returns true, all callbacks have run
* and the ActorSystem has been fully stopped, i.e.
* `awaitTermination(0 seconds)` would return normally. If this method
* returns `false`, the status is actually unknown, since it might have
* changed since you queried it.
*/
def isTerminated: Boolean
/**
* Registers the provided extension and creates its payload, if this extension isn't already registered
* This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization
@ -452,15 +463,19 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
def deadLetters: ActorRef = provider.deadLetters
val deadLetterMailbox: Mailbox = new Mailbox(null) {
val deadLetterQueue: MessageQueue = new MessageQueue {
def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
def dequeue() = null
def hasMessages = false
def numberOfMessages = 0
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) {
becomeClosed()
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
override def dequeue() = null
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) }
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(): SystemMessage = null
def hasSystemMessages = false
}
def locker: Locker = provider.locker
@ -499,8 +514,9 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
def registerOnTermination(code: Runnable) { terminationCallbacks.add(code) }
def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) }
def awaitTermination() = awaitTermination(Duration.Inf)
def isTerminated = terminationCallbacks.isTerminated
def shutdown(): Unit = stop(guardian)
def shutdown(): Unit = guardian.stop()
/**
* Create the scheduler service. This one needs one special behavior: if
@ -634,5 +650,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
}
final def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost)
final def isTerminated: Boolean = latch.getCount == 0
}
}

View file

@ -135,7 +135,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
* Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates.
*/
def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider =
def makeDecider(trapExit: Array[Class[_]]): Decider =
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate }
/**
@ -248,7 +248,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/*
@ -260,10 +260,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
children foreach (context.stop(_))
//TODO optimization to drop all children here already?
}
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) {
@ -294,7 +291,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/*

View file

@ -62,7 +62,7 @@ trait Stash extends Actor {
* `mailbox.queue` is the underlying `Deque`.
*/
private val mailbox: DequeBasedMessageQueue = {
context.asInstanceOf[ActorCell].mailbox match {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
case queue: DequeBasedMessageQueue queue
case other throw new ActorInitializationException(self, "DequeBasedMailbox required, got: " + other.getClass() + """
An (unbounded) deque-based mailbox can be configured as follows:

View file

@ -128,11 +128,11 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case null SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null)
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array())
case ps
val serialization = SerializationExtension(akka.serialization.JavaSerializer.currentSystem.value)
val serializedParameters = Array.ofDim[(Int, Class[_], Array[Byte])](ps.length)
for (i 0 until ps.length) {
val p = ps(i)
val system = akka.serialization.JavaSerializer.currentSystem.value
val s = SerializationExtension(system).findSerializerFor(p)
val s = serialization.findSerializerFor(p)
val m = if (s.includeManifest) p.getClass else null
serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity
}
@ -256,33 +256,40 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case _ super.postRestart(reason)
}
protected def withContext[T](unitOfWork: T): T = {
TypedActor.selfReference set proxyVar.get
TypedActor.currentContext set context
try unitOfWork finally {
TypedActor.selfReference set null
TypedActor.currentContext set null
}
}
def receive = {
case m: MethodCall
TypedActor.selfReference set proxyVar.get
TypedActor.currentContext set context
try {
if (m.isOneWay) m(me)
else {
try {
if (m.returnsFuture_?) {
val s = sender
m(me).asInstanceOf[Future[Any]] onComplete {
case Left(f) s ! Status.Failure(f)
case Right(r) s ! r
}
} else {
sender ! m(me)
case m: MethodCall withContext {
if (m.isOneWay) m(me)
else {
try {
if (m.returnsFuture_?) {
val s = sender
m(me).asInstanceOf[Future[Any]] onComplete {
case Left(f) s ! Status.Failure(f)
case Right(r) s ! r
}
} catch {
case NonFatal(e)
sender ! Status.Failure(e)
throw e
} else {
sender ! m(me)
}
} catch {
case NonFatal(e)
sender ! Status.Failure(e)
throw e
}
} finally {
TypedActor.selfReference set null
TypedActor.currentContext set null
}
}
case msg if me.isInstanceOf[Receiver] withContext {
me.asInstanceOf[Receiver].onReceive(msg, sender)
}
}
}
@ -297,6 +304,13 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
def supervisorStrategy(): SupervisorStrategy
}
/**
* Mix this into your TypedActor to be able to intercept Terminated messages
*/
trait Receiver {
def onReceive(message: Any, sender: ActorRef): Unit
}
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/

View file

@ -383,12 +383,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
def mailboxType(): MailboxType = {
config.getString("mailbox-type") match {
case ""
val capacity = config.getInt("mailbox-capacity")
if (capacity < 1) UnboundedMailbox()
else {
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
BoundedMailbox(capacity, duration)
}
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
else new BoundedMailbox(config)
case "unbounded" UnboundedMailbox()
case "bounded" new BoundedMailbox(config)
case fqcn
val args = Seq(classOf[Config] -> config)
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {

View file

@ -40,47 +40,9 @@ class BalancingDispatcher(
def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path
}))
val messageQueue: MessageQueue = mailboxType match {
case UnboundedMailbox()
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
val messageQueue: MessageQueue = mailboxType.create(None)
case BoundedMailbox(cap, timeout)
new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](cap)
final val pushTimeOut = timeout
}
case other throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]")
}
protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor)
class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle)
final def dequeue(): Envelope = messageQueue.dequeue()
final def numberOfMessages: Int = messageQueue.numberOfMessages
final def hasMessages: Boolean = messageQueue.hasMessages
override def cleanUp(): Unit = {
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
if (hasSystemMessages) {
val dlq = actor.systemImpl.deadLetterMailbox
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlq.systemEnqueue(actor.self, message)
message = next
}
}
}
}
protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue)
protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor)
@ -111,4 +73,23 @@ class BalancingDispatcher(
scheduleOne()
}
}
}
class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue)
extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue {
override def cleanUp(): Unit = {
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
if (hasSystemMessages) {
val dlq = actor.systemImpl.deadLetterMailbox
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlq.systemEnqueue(actor.self, message)
message = next
}
}
}
}

View file

@ -65,7 +65,7 @@ class Dispatcher(
}
}
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor)
protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue
protected[akka] def shutdown: Unit =
Option(executorService.getAndSet(new ExecutorServiceDelegate {

View file

@ -39,12 +39,14 @@ object Await {
* Should throw [[java.util.concurrent.TimeoutException]] if times out
* This method should not be called directly.
*/
@throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type
/**
* Throws exceptions if cannot produce a T within the specified time
* This method should not be called directly.
*/
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T
}
@ -57,6 +59,7 @@ object Await {
* @throws [[java.util.concurrent.TimeoutException]] if times out
* @return The returned value as returned by Awaitable.ready
*/
@throws(classOf[TimeoutException])
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
/**
@ -66,6 +69,7 @@ object Await {
* @throws [[java.util.concurrent.TimeoutException]] if times out
* @return The returned value as returned by Awaitable.result
*/
@throws(classOf[Exception])
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
}
@ -156,9 +160,9 @@ object Futures {
/**
* Signals that the current thread of execution will potentially engage
* in blocking calls after the call to this method, giving the system a
* chance to spawn new threads, reuse old threads or otherwise, to prevent
* starvation and/or unfairness.
* an action that will take a non-trivial amount of time, perhaps by using blocking.IO or using a lot of CPU time,
* giving the system a chance to spawn new threads, reuse old threads or otherwise,
* to prevent starvation and/or unfairness.
*
* Assures that any Future tasks initiated in the current thread will be
* executed asynchronously, including any tasks currently queued to be
@ -312,9 +316,9 @@ object Future {
/**
* Signals that the current thread of execution will potentially engage
* in blocking calls after the call to this method, giving the system a
* chance to spawn new threads, reuse old threads or otherwise, to prevent
* starvation and/or unfairness.
* an action that will take a non-trivial amount of time, perhaps by using blocking.IO or using a lot of CPU time,
* giving the system a chance to spawn new threads, reuse old threads or otherwise,
* to prevent starvation and/or unfairness.
*
* Assures that any Future tasks initiated in the current thread will be
* executed asynchronously, including any tasks currently queued to be
@ -819,10 +823,12 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
}
@throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (isCompleted || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e: AskTimeoutException) throw new AskTimeoutException(e.getMessage, e) // to get meaningful stack trace

View file

@ -35,26 +35,21 @@ object Mailbox {
final val debug = false
}
/**
* Custom mailbox implementations are implemented by extending this class.
* E.g.
* <pre<code>
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue[Envelope]()
* }
* </code></pre>
*/
abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
/**
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
*/
private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
import Mailbox._
def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
def dequeue(): Envelope = messageQueue.dequeue()
def hasMessages: Boolean = messageQueue.hasMessages
def numberOfMessages: Int = messageQueue.numberOfMessages
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@ -217,25 +212,20 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
*/
protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox
val dlq = actor.systemImpl.deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox
if (hasSystemMessages) {
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlq.systemEnqueue(actor.self, message)
dlm.systemEnqueue(actor.self, message)
message = next
}
}
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
dlq.enqueue(actor.self, envelope)
envelope = dequeue
}
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
}
}
@ -261,9 +251,20 @@ trait MessageQueue {
* Indicates whether this queue is non-empty.
*/
def hasMessages: Boolean
/**
* Called when the mailbox this queue belongs to is disposed of. Normally it
* is expected to transfer all remaining messages into the dead letter queue
* which is passed in. The owner of this MessageQueue is passed in if
* available (e.g. for creating DeadLetters()), /deadletters otherwise.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit
}
trait SystemMessageQueue {
/**
* Internal mailbox implementation detail.
*/
private[akka] trait SystemMessageQueue {
/**
* Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
*/
@ -277,7 +278,10 @@ trait SystemMessageQueue {
def hasSystemMessages: Boolean
}
trait DefaultSystemMessageQueue { self: Mailbox
/**
* Internal mailbox implementation detail.
*/
private[akka] trait DefaultSystemMessageQueue { self: Mailbox
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
@ -308,8 +312,17 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
final def numberOfMessages = queue.size
final def hasMessages = !queue.isEmpty
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
envelope = dequeue
}
}
}
}
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
@ -338,97 +351,113 @@ trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
}
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
final def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
final def dequeue(): Envelope = queue.poll()
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
def dequeue(): Envelope = queue.poll()
}
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingDeque[Envelope]
final def enqueue(receiver: ActorRef, handle: Envelope): Unit =
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue put handle
final def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue putFirst handle
final def dequeue(): Envelope = queue.poll()
def dequeue(): Envelope = queue.poll()
}
/**
* Mailbox configuration.
*/
trait MailboxType {
def create(receiver: ActorContext): Mailbox
def create(owner: Option[ActorContext]): MessageQueue
}
/**
* It's a case class for Java (new UnboundedMailbox)
*/
case class UnboundedMailbox() extends MailboxType {
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
def this(config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingQueue[Envelope](capacity)
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
/**
* Extend me to provide the comparator
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
final override def create(owner: Option[ActorContext]): MessageQueue =
new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
/**
* Extend me to provide the comparator
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final override def create(owner: Option[ActorContext]): MessageQueue =
new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType {
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingDeque[Envelope]()
case class UnboundedDequeBasedMailbox() extends MailboxType {
def this(config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
final val queue = this
}
}
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
def this(config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingDeque[Envelope](capacity)
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
final val queue = this
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut
}
}

View file

@ -734,37 +734,37 @@ trait LoggingAdapter {
*/
def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format1(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(message: String) { if (isErrorEnabled) notifyError(message) }
def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format(template, arg1)) }
def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format1(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) }
def warning(message: String) { if (isWarningEnabled) notifyWarning(message) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1)) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format1(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) }
def info(message: String) { if (isInfoEnabled) notifyInfo(message) }
def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1)) }
def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format1(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) }
def debug(message: String) { if (isDebugEnabled) notifyDebug(message) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1)) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format1(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) }
def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) }
def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1)) }
def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format1(template, arg1)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) }
@ -783,16 +783,28 @@ trait LoggingAdapter {
case Logging.DebugLevel if (isDebugEnabled) notifyDebug(message)
}
private def format1(t: String, arg: Any) = arg match {
case a: Array[_] if !a.getClass.getComponentType.isPrimitive format(t, a: _*)
case a: Array[_] format(t, (a map (_.asInstanceOf[AnyRef]): _*))
case x format(t, x)
}
def format(t: String, arg: Any*) = {
val sb = new StringBuilder
var p = 0
var rest = t
while (p < arg.length) {
val index = rest.indexOf("{}")
sb.append(rest.substring(0, index))
sb.append(arg(p))
rest = rest.substring(index + 2)
p += 1
if (index == -1) {
sb.append(rest).append(" WARNING arguments left: ").append(arg.length - p)
rest = ""
p = arg.length
} else {
sb.append(rest.substring(0, index))
sb.append(arg(p))
rest = rest.substring(index + 2)
p += 1
}
}
sb.append(rest)
sb.toString

View file

@ -191,9 +191,9 @@ trait AskSupport {
val result = Promise[Any]()(provider.dispatcher)
val a = new PromiseActorRef(provider, path, provider.tempContainer, result, provider.deathWatch)
provider.registerTempActor(a, path)
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.failure(new AskTimeoutException("Timed out")) }
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) }
result onComplete { _
try { a.stop(); f.cancel() }
try { try a.stop() finally f.cancel() }
finally { provider.unregisterTempActor(path) }
}
a

View file

@ -171,7 +171,14 @@ trait RouterConfig {
def createRouteeProvider(context: ActorContext) = new RouteeProvider(context, resizer)
def createActor(): Router = new Router {}
def createActor(): Router = new Router {
override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy
}
/**
* SupervisorStrategy for the created Router actor.
*/
def supervisorStrategy: SupervisorStrategy
/**
* Dispatcher ID to use for running the head actor, i.e. the [[akka.routing.Router]].
@ -308,10 +315,19 @@ trait Router extends Actor {
def routerReceive: Receive = {
case _
}
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
}
}
private object Router {
case object Resize
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
@ -353,6 +369,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
case object NoRouter extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def routerDispatcher: String = ""
def supervisorStrategy = null
override def withFallback(other: RouterConfig): RouterConfig = other
}
@ -363,6 +380,7 @@ case object FromConfig extends RouterConfig {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
}
/**
@ -378,6 +396,8 @@ case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatch
def createRoute(props: Props, routeeProvider: RouteeProvider): Route =
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
}
object RoundRobinRouter {
@ -402,12 +422,40 @@ object RoundRobinRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with RoundRobinLike {
/**
@ -438,6 +486,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait RoundRobinLike { this: RouterConfig
@ -488,12 +542,40 @@ object RandomRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with RandomLike {
/**
@ -524,6 +606,12 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait RandomLike { this: RouterConfig
@ -580,12 +668,40 @@ object SmallestMailboxRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with SmallestMailboxLike {
/**
@ -616,6 +732,12 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait SmallestMailboxLike { this: RouterConfig
@ -731,12 +853,40 @@ object BroadcastRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with BroadcastLike {
/**
@ -767,6 +917,12 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait BroadcastLike { this: RouterConfig
@ -808,13 +964,41 @@ object ScatterGatherFirstCompletedRouter {
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* {{{
* class MyActor extends Actor {
* override val supervisorStrategy = ...
*
* val poolAsAWhole = context.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(5)))
*
* val poolIndividuals = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = this.supervisorStrategy)))
*
* val specialChild = context.actorOf(Props[SomeActor].withRouter(
* RoundRobinRouter(5, supervisorStrategy = OneForOneStrategy() {
* ...
* })))
* }
* }}}
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
extends RouterConfig with ScatterGatherFirstCompletedLike {
if (within <= Duration.Zero) throw new IllegalArgumentException(
@ -848,6 +1032,12 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
}
trait ScatterGatherFirstCompletedLike { this: RouterConfig

View file

@ -111,16 +111,14 @@ class JavaSerializer(val system: ExtendedActorSystem) extends Serializer {
def toBinary(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(o)
JavaSerializer.currentSystem.withValue(system) { out.writeObject(o) }
out.close()
bos.toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new ByteArrayInputStream(bytes))
val obj = JavaSerializer.currentSystem.withValue(system) {
in.readObject
}
val obj = JavaSerializer.currentSystem.withValue(system) { in.readObject }
in.close()
obj
}

View file

@ -2,7 +2,7 @@
# Akka Agent Reference Config File #
####################################
# This the reference config file has all the default settings.
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka {

View file

@ -2,7 +2,7 @@
# Akka Cluster Reference Config File #
######################################
# This the reference config file has all the default settings.
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka {

View file

@ -15,3 +15,9 @@ Scalatra has Akka integration.
Read more here: `<https://github.com/scalatra/scalatra/blob/develop/akka/src/main/scala/org/scalatra/akka/AkkaSupport.scala>`_
Gatling
-------
Gatling is an Open Source Stress Tool.
Read more here: `<http://gatling-tool.org/>`_

View file

@ -66,6 +66,11 @@ behavior logic, or the function itself may be swapped out at runtime, see the
during construction of the actor object is special in the sense that a restart
of the actor will reset its behavior to this initial one.
.. note::
The initial behavior of an Actor is extracted prior to constructor is run,
so if you want to base your initial behavior on member state, you should
use ``become`` in the constructor.
Mailbox
-------
@ -127,7 +132,7 @@ Once an actor terminates, i.e. fails in a way which is not handled by a
restart, stops itself or is stopped by its supervisor, it will free up its
resources, draining all remaining messages from its mailbox into the systems
“dead letter mailbox”. The mailbox is then replaced within the actor reference
with a that system mailbox, redirecting all new messages “into the drain”. This
with a system mailbox, redirecting all new messages “into the drain”. This
is done on a best effort basis, though, so do not rely on it in order to
construct “guaranteed delivery”.

View file

@ -7,6 +7,8 @@ This chapter outlines the concept behind supervision, the primitives offered
and their semantics. For details on how that translates into real code, please
refer to the corresponding chapters for Scala and Java APIs.
.. _supervision-directives:
What Supervision Means
----------------------
@ -110,3 +112,39 @@ external resource, which may also be one of its own children. If a third party
terminates a child by way of the ``system.stop(child)`` method or sending a
:class:`PoisonPill`, the supervisor might well be affected.
One-For-One Strategy vs. All-For-One Strategy
---------------------------------------------
There are two classes of supervision strategies which come with Akka:
:class:`OneForOneStrategy` and :class:`AllForOneStrategy`. Both are configured
with a mapping from exception type to supervision directive (see
:ref:`above <supervision-directives>`) and limits on how often a child is allowed to fail
before terminating it. The difference between them is that the former applies
the obtained directive only to the failed child, whereas the latter applies it
to all siblings as well. Normally, you should use the
:class:`OneForOneStrategy`, which also is the default if none is specified
explicitly.
The :class:`AllForOneStrategy` is applicable in cases where the ensemble of
children has so tight dependencies among them, that a failure of one child
affects the function of the others, i.e. they are intricably linked. Since a
restart does not clear out the mailbox, it often is best to stop the children
upon failure and re-create them explicitly from the supervisor (by watching the
childrens lifecycle); otherwise you have to make sure that it is no problem
for any of the actors to receive a message which was queued before the restart
but processed afterwards.
Normally stopping a child (i.e. not in response to a failure) will not
automatically terminate the other children in an all-for-one strategy, that can
easily be done by watching their lifecycle: if the :class:`Terminated` message
is not handled by the supervisor, it will throw a :class:`DeathPathException`
which (depending on its supervisor) will restart it, and the default
:meth:`preRestart` action will terminate all children. Of course this can be
handled explicitly as well.
Please note that creating one-off actors from an all-for-one supervisor entails
that failures escalated by the temporary actor will affect all the permanent
ones. If this is not desired, install an intermediate supervisor; this can very
easily be done by declaring a router of size 1 for the worker, see
:ref:`routing-scala` or :ref:`routing-java`.

View file

@ -150,7 +150,7 @@ public class FaultHandlingTestBase {
}
@Test
public void mustEmploySupervisorStrategy() {
public void mustEmploySupervisorStrategy() throws Exception {
// code here
//#testkit
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);

View file

@ -130,7 +130,7 @@ public class UntypedActorDocTestBase {
}
@Test
public void usingAsk() {
public void usingAsk() throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
@ -188,7 +188,7 @@ public class UntypedActorDocTestBase {
}
@Test
public void useWatch() {
public void useWatch() throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(WatchActor.class));
Future<Object> future = Patterns.ask(myActor, "kill", 1000);
@ -197,7 +197,7 @@ public class UntypedActorDocTestBase {
}
@Test
public void usePatternsGracefulStop() {
public void usePatternsGracefulStop() throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef actorRef = system.actorOf(new Props(MyUntypedActor.class));
//#gracefulStop

View file

@ -20,8 +20,11 @@ import akka.event.LoggingAdapter;
//#imports-prio
//#imports-prio-mailbox
import akka.actor.ActorContext;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import com.typesafe.config.Config;
//#imports-prio-mailbox
@ -57,19 +60,17 @@ public class DispatcherDocTestBase {
@Test
public void defineDispatcher() {
//#defining-dispatcher
ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor1");
ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor2");
ActorRef myActor =
system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor3");
//#defining-dispatcher
}
@Test
public void definePinnedDispatcher() {
//#defining-pinned-dispatcher
String name = "myactor";
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)
.withDispatcher("myactor-dispatcher"), name);
.withDispatcher("my-pinned-dispatcher"));
//#defining-pinned-dispatcher
}
@ -77,11 +78,13 @@ public class DispatcherDocTestBase {
public void priorityDispatcher() throws Exception {
//#prio-dispatcher
ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes
// We create a new Actor that just prints out what it processes
ActorRef myActor = system.actorOf(
new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedActor() {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
LoggingAdapter log =
Logging.getLogger(getContext().system(), this);
{
getSelf().tell("lowpriority");
getSelf().tell("lowpriority");
@ -98,7 +101,7 @@ public class DispatcherDocTestBase {
}
};
}
}).withDispatcher("prio-dispatcher-java"));
}).withDispatcher("prio-dispatcher"));
/*
Logs:
@ -120,24 +123,22 @@ public class DispatcherDocTestBase {
}
//#prio-mailbox
public static class PrioMailbox extends UnboundedPriorityMailbox {
static final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
@Override
public int gen(Object message) {
if (message.equals("highpriority"))
return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority"))
return 100; // 'lowpriority messages should be treated last if possible
else if (message.equals(Actors.poisonPill()))
return 1000; // PoisonPill when no other left
else
return 50; // We default to 50
}
};
public PrioMailbox(Config config) {
super(generator);
public static class MyPrioMailbox extends UnboundedPriorityMailbox {
public MyPrioMailbox(Config config) { // needed for reflective instantiation
// Create a new PriorityGenerator, lower prio means more important
super(new PriorityGenerator() {
@Override
public int gen(Object message) {
if (message.equals("highpriority"))
return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority"))
return 2; // 'lowpriority messages should be treated last if possible
else if (message.equals(Actors.poisonPill()))
return 3; // PoisonPill when no other left
else
return 1; // By default they go between high and low prio
}
});
}
}
//#prio-mailbox

View file

@ -33,7 +33,7 @@ import akka.actor.DeadLetter;
//#imports-deadletter
public class LoggingDocTestBase {
@Test
public void useLoggingActor() {
ActorSystem system = ActorSystem.create("MySystem");
@ -55,6 +55,16 @@ public class LoggingDocTestBase {
//#deadletters
system.shutdown();
}
@Test
public void demonstrateMultipleArgs() {
final ActorSystem system = ActorSystem.create("multiArg");
//#array
final Object[] args = new Object[] { "The", "brown", "fox", "jumps", 42 };
system.log().debug("five parameters: {}, {}, {}, {}, {}", args);
//#array
system.shutdown();
}
//#my-actor
class MyActor extends UntypedActor {

View file

@ -5,8 +5,6 @@ package akka.docs.future;
//#imports1
import akka.dispatch.*;
import akka.japi.Procedure;
import akka.japi.Procedure2;
import akka.util.Timeout;
//#imports1
@ -41,9 +39,17 @@ import static akka.dispatch.Futures.reduce;
//#imports6
//#imports7
import akka.dispatch.ExecutionContexts;
import akka.dispatch.ExecutionContextExecutorService;
//#imports7
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Before;
@ -73,8 +79,22 @@ public class FutureDocTestBase {
system.shutdown();
}
@Test public void useCustomExecutionContext() throws Exception {
ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor();
//#diy-execution-context
ExecutionContextExecutorService ec =
ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere);
//Use ec with your Futures
Future<String> f1 = Futures.successful("foo", ec);
// Then you shut the ec down somewhere at the end of your program/application.
ec.shutdown();
//#diy-execution-context
}
@Test
public void useBlockingFromActor() {
public void useBlockingFromActor() throws Exception {
ActorRef actor = system.actorOf(new Props(MyActor.class));
String msg = "hello";
//#ask-blocking
@ -86,7 +106,7 @@ public class FutureDocTestBase {
}
@Test
public void useFutureEval() {
public void useFutureEval() throws Exception {
//#future-eval
Future<String> f = future(new Callable<String>() {
public String call() {
@ -99,7 +119,7 @@ public class FutureDocTestBase {
}
@Test
public void useMap() {
public void useMap() throws Exception {
//#map
Future<String> f1 = future(new Callable<String>() {
public String call() {
@ -162,7 +182,7 @@ public class FutureDocTestBase {
}
@Test
public void useFlatMap() {
public void useFlatMap() throws Exception {
//#flat-map
Future<String> f1 = future(new Callable<String>() {
public String call() {
@ -186,7 +206,7 @@ public class FutureDocTestBase {
}
@Test
public void useSequence() {
public void useSequence() throws Exception {
List<Future<Integer>> source = new ArrayList<Future<Integer>>();
source.add(Futures.successful(1, system.dispatcher()));
source.add(Futures.successful(2, system.dispatcher()));
@ -214,7 +234,7 @@ public class FutureDocTestBase {
}
@Test
public void useTraverse() {
public void useTraverse() throws Exception {
//#traverse
//Just a sequence of Strings
Iterable<String> listStrings = Arrays.asList("a", "b", "c");
@ -236,7 +256,7 @@ public class FutureDocTestBase {
}
@Test
public void useFold() {
public void useFold() throws Exception {
List<Future<String>> source = new ArrayList<Future<String>>();
source.add(Futures.successful("a", system.dispatcher()));
source.add(Futures.successful("b", system.dispatcher()));
@ -258,7 +278,7 @@ public class FutureDocTestBase {
}
@Test
public void useReduce() {
public void useReduce() throws Exception {
List<Future<String>> source = new ArrayList<Future<String>>();
source.add(Futures.successful("a", system.dispatcher()));
source.add(Futures.successful("b", system.dispatcher()));
@ -280,7 +300,7 @@ public class FutureDocTestBase {
}
@Test
public void useSuccessfulAndFailed() {
public void useSuccessfulAndFailed() throws Exception {
//#successful
Future<String> future = Futures.successful("Yay!", system.dispatcher());
//#successful
@ -294,7 +314,7 @@ public class FutureDocTestBase {
}
@Test
public void useFilter() {
public void useFilter() throws Exception {
//#filter
Future<Integer> future1 = Futures.successful(4, system.dispatcher());
Future<Integer> successfulFilter = future1.filter(new Filter<Integer>() {
@ -324,10 +344,10 @@ public class FutureDocTestBase {
public void useAndThen() {
//#and-then
Future<String> future1 = Futures.successful("value", system.dispatcher()).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null)
sendToIssueTracker(failure);
}
public void onComplete(Throwable failure, String result) {
if (failure != null)
sendToIssueTracker(failure);
}
}).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (result != null)
@ -338,7 +358,7 @@ public class FutureDocTestBase {
}
@Test
public void useRecover() {
public void useRecover() throws Exception {
//#recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
@ -358,7 +378,7 @@ public class FutureDocTestBase {
}
@Test
public void useTryRecover() {
public void useTryRecover() throws Exception {
//#try-recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
@ -382,7 +402,7 @@ public class FutureDocTestBase {
}
@Test
public void useOnSuccessOnFailureAndOnComplete() {
public void useOnSuccessOnFailureAndOnComplete() throws Exception {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
@ -416,20 +436,20 @@ public class FutureDocTestBase {
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
}
}
}
});
//#onComplete
}
}
@Test
public void useOrAndZip() {
public void useOrAndZip() throws Exception {
{
//#zip
Future<String> future1 = Futures.successful("foo", system.dispatcher());

View file

@ -52,10 +52,20 @@ public class CustomRouterDocTestBase {
.withDispatcher("workers")); // MyActor workers run on "workers" dispatcher
//#dispatchers
}
@Test
public void demonstrateSupervisor() {
//#supervision
final SupervisorStrategy strategy = new OneForOneStrategy(5, Duration.parse("1 minute"),
new Class<?>[] { Exception.class });
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy)));
//#supervision
}
//#crTest
@Test
public void countVotesAsIntendedNotAsInFlorida() {
public void countVotesAsIntendedNotAsInFlorida() throws Exception {
ActorRef routedActor = system.actorOf(new Props().withRouter(new VoteCountRouter()));
routedActor.tell(DemocratVote);
routedActor.tell(DemocratVote);
@ -123,6 +133,10 @@ public class CustomRouterDocTestBase {
@Override public String routerDispatcher() {
return Dispatchers.DefaultDispatcherId();
}
@Override public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.defaultStrategy();
}
//#crRoute
@Override

View file

@ -18,7 +18,7 @@ import akka.dispatch.Await;
//#parentActor
public class ParentActor extends UntypedActor {
public void onReceive(Object msg) {
public void onReceive(Object msg) throws Exception {
if (msg.equals("rrr")) {
//#roundRobinRouter
ActorRef roundRobinRouter = getContext().actorOf(

View file

@ -5,10 +5,13 @@ package akka.docs.jrouting;
import akka.routing.RoundRobinRouter;
import akka.routing.DefaultResizer;
import akka.routing.RemoteRouterConfig;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressExtractor;
import java.util.Arrays;
public class RouterViaProgramExample {
@ -67,6 +70,14 @@ public class RouterViaProgramExample {
for (int i = 1; i <= 6; i++) {
router3.tell(new ExampleActor.Message(i));
}
//#remoteRoutees
Address addr1 = new Address("akka", "remotesys", "otherhost", 1234);
Address addr2 = AddressExtractor.parse("akka://othersys@anotherhost:1234");
Address[] addresses = new Address[] { addr1, addr2 };
ActorRef routerRemote = system.actorOf(new Props(ExampleActor.class)
.withRouter(new RemoteRouterConfig(new RoundRobinRouter(5), addresses)));
//#remoteRoutees
}
private class CompileCheckJavaDocsForRouting extends UntypedActor {

View file

@ -20,7 +20,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
public class TransactorDocTest {
@Test
public void coordinatedExample() {
public void coordinatedExample() throws Exception {
//#coordinated-example
ActorSystem system = ActorSystem.create("CoordinatedExample");
@ -63,7 +63,7 @@ public class TransactorDocTest {
}
@Test
public void counterTransactor() {
public void counterTransactor() throws Exception {
ActorSystem system = ActorSystem.create("CounterTransactor");
ActorRef counter = system.actorOf(new Props(Counter.class));
@ -79,7 +79,7 @@ public class TransactorDocTest {
}
@Test
public void friendlyCounterTransactor() {
public void friendlyCounterTransactor() throws Exception {
ActorSystem system = ActorSystem.create("FriendlyCounterTransactor");
ActorRef friend = system.actorOf(new Props(Counter.class));
ActorRef friendlyCounter = system.actorOf(new Props(FriendlyCounter.class));

View file

@ -7,204 +7,157 @@ Dispatchers (Java)
.. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs.
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread.
The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM.
An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used
to execute arbitrary code, for instance :ref:`futures-java`.
Default dispatcher
------------------
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created.
The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher``
section of the :ref:`configuration`.
Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases.
If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher.
See below for details on which ones are available and how they can be configured.
Setting the dispatcher for an Actor
-----------------------------------
.. warning::
Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher
as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used.
So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is:
Setting the dispatcher
----------------------
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher
You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key
of the dispatcher settings.
.. note::
The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration.
So in this example it's a top-level section, but you could for instance put it as a sub-section,
where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports,defining-dispatcher
And then you just need to configure that dispatcher in your configuration:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
And here's another example that uses the "thread-pool-executor":
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
For more options, see the default-dispatcher section of the :ref:`configuration`.
Types of dispatchers
--------------------
There are 4 different types of message dispatchers:
* Thread-based (Pinned)
* Event-based
* Priority event-based
* Work-sharing (Balancing)
* Dispatcher
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
- Sharability: Unlimited
Example of a custom event-based dispatcher, which can be used with
``new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher")``
as in the example above:
- Mailboxes: Any, creates one per Actor
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
- Use cases: Default dispatcher, Bulkheading
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override
the values for the ``default-dispatcher`` in your configuration.
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
.. note::
* PinnedDispatcher
It should be noted that the ``dispatcher-id`` used in :class:`Props` is in
fact an absolute path into the configuration object, i.e. you can declare a
dispatcher configuration nested within other configuration objects and refer
to it like so: ``"my.config.object.myAwesomeDispatcher"``
- Sharability: None
There are two different executor services:
- Mailboxes: Any, creates one per Actor
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
- Use cases: Bulkheading
Note that the pool size is configured differently for the two executor services. The configuration above
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``:
- Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator``
by default a "thread-pool-executor"
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
* BalancingDispatcher
Let's now walk through the different dispatchers in more detail.
- Sharability: Actors of the same type only
Thread-based
^^^^^^^^^^^^
- Mailboxes: Any, creates one for all Actors
The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other.
- Use cases: Work-sharing
The ``PinnedDispatcher`` is configured like this:
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
* CallingThreadDispatcher
- Sharability: Unlimited
- Mailboxes: Any, creates one per Actor per Thread (on demand)
- Use cases: Testing
- Driven by: The calling thread (duh)
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Configuring a ``PinnedDispatcher``:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Note that it must be used with ``executor = "thread-pool-executor"``.
And then using it:
Event-based
^^^^^^^^^^^
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher
The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
Mailboxes
---------
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g.
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
design and implement your system in the most efficient way in regards to performance, throughput and latency.
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance.
It comes with many different predefined BlockingQueue configurations:
Builtin implementations
^^^^^^^^^^^^^^^^^^^^^^^
* Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
Akka comes shipped with a number of default mailbox implementations:
When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's
thread as a way to slow him down and balance producer/consumer.
* UnboundedMailbox
Here is an example of a bounded mailbox:
- Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
- Blocking: No
The standard :class:`Dispatcher` allows you to define the ``throughput`` it
should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will batch process up to ``throughput`` messages together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
- Bounded: No
Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available.
* BoundedMailbox
Priority event-based
^^^^^^^^^^^^^^^^^^^^
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended).
- Blocking: Yes
Creating a Dispatcher with a mailbox using PriorityGenerator:
- Bounded: Yes
Config:
* UnboundedPriorityMailbox
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-dispatcher-config-java
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
Priority mailbox:
- Blocking: Yes
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports-prio-mailbox,prio-mailbox
- Bounded: No
Usage:
* BoundedPriorityMailbox
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports-prio,prio-dispatcher
- Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Blocking: Yes
Work-sharing event-based
^^^^^^^^^^^^^^^^^^^^^^^^^
- Bounded: Yes
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to
share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
* Durable mailboxes, see :ref:`durable-mailboxes`.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config
Mailbox configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
How to create a PriorityMailbox:
Making the Actor mailbox bounded
--------------------------------
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherTestBase.java#prio-mailbox
Global configuration
^^^^^^^^^^^^^^^^^^^^
And then add it to the configuration:
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
if not specified otherwise.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config
.. code-block:: ruby
akka {
actor {
default-dispatcher {
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specified
mailbox-capacity = 1000
}
}
}
Per-instance based configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also do it on a specific dispatcher instance.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
if the message cannot be added to the mailbox within the time specified by the pushTimeout.
And then an example on how you would use it:
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher

View file

@ -22,6 +22,9 @@ which is very similar to a ``java.util.concurrent.Executor``. if you have an ``A
it will use its default dispatcher as the ``ExecutionContext``, or you can use the factory methods provided
by the ``ExecutionContexts`` class to wrap ``Executors`` and ``ExecutorServices``, or even create your own.
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: imports1,imports7,diy-execution-context
Use with Actors
---------------

View file

@ -30,8 +30,13 @@ object is translated to a String according to the following rules:
* in case of a class an approximation of its simpleName
* and in all other cases the simpleName of its class
The log message may contain argument placeholders ``{}``, which will be substituted if the log level
is enabled.
The log message may contain argument placeholders ``{}``, which will be
substituted if the log level is enabled. Giving more arguments as there are
placeholders results in a warning being appended to the log statement (i.e. on
the same line with the same severity). You may pass a Java array as the only
substitution argument to have its elements be treated individually:
.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#array
The Java :class:`Class` of the log source is also included in the generated
:class:`LogEvent`. In case of a simple string this is replaced with a “marker”

View file

@ -249,5 +249,48 @@ Observe how the name of the server actor matches the deployment given in the
configuration file, which will transparently delegate the actor creation to the
remote node.
Remote Events
-------------
It is possible to listen to events that occur in Akka Remote, and to subscribe/unsubscribe to there events,
you simply register as listener to the below described types in on the ``ActorSystem.eventStream``.
.. note::
To subscribe to any outbound-related events, subscribe to ``RemoteClientLifeCycleEvent``
To subscribe to any inbound-related events, subscribe to ``RemoteServerLifeCycleEvent``
To subscribe to any remote events, subscribe to ``RemoteLifeCycleEvent``
To intercept when an outbound connection is disconnected, you listen to ``RemoteClientDisconnected`` which
holds the transport used (RemoteTransport) and the outbound address that was disconnected (Address).
To intercept when an outbound connection is connected, you listen to ``RemoteClientConnected`` which
holds the transport used (RemoteTransport) and the outbound address that was connected to (Address).
To intercept when an outbound client is started you listen to ``RemoteClientStarted``
which holds the transport used (RemoteTransport) and the outbound address that it is connected to (Address).
To intercept when an outbound client is shut down you listen to ``RemoteClientShutdown``
which holds the transport used (RemoteTransport) and the outbound address that it was connected to (Address).
To intercept when an outbound message cannot be sent, you listen to ``RemoteClientWriteFailed`` which holds
the payload that was not written (AnyRef), the cause of the failed send (Throwable),
the transport used (RemoteTransport) and the outbound address that was the destination (Address).
For general outbound-related errors, that do not classify as any of the others, you can listen to ``RemoteClientError``,
which holds the cause (Throwable), the transport used (RemoteTransport) and the outbound address (Address).
To intercept when an inbound server is started (typically only once) you listen to ``RemoteServerStarted``
which holds the transport that it will use (RemoteTransport).
To intercept when an inbound server is shut down (typically only once) you listen to ``RemoteServerShutdown``
which holds the transport that it used (RemoteTransport).
To intercept when an inbound connection has been established you listen to ``RemoteServerClientConnected``
which holds the transport used (RemoteTransport) and optionally the address that connected (Option<Address>).
To intercept when an inbound connection has been disconnected you listen to ``RemoteServerClientDisconnected``
which holds the transport used (RemoteTransport) and optionally the address that disconnected (Option<Address>).
To intercept when an inbound remote client has been closed you listen to ``RemoteServerClientClosed``
which holds the transport used (RemoteTransport) and optionally the address of the remote client that was closed (Option<Address>).

View file

@ -54,6 +54,18 @@ Once you have the router actor it is just to send messages to it as you would to
The router will apply its behavior to the message it receives and forward it to the routees.
Remotely Deploying Routees
**************************
In addition to being able to supply looked-up remote actors as routees, you can
make the router deploy its created children on a set of remote hosts; this will
be done in round-robin fashion. In order to do that, wrap the router
configuration in a :class:`RemoteRouterConfig`, attaching the remote addresses of
the nodes to deploy to. Naturally, this requires your to include the
``akka-remote`` module on your classpath:
.. includecode:: code/akka/docs/jrouting/RouterViaProgramExample.java#remoteRoutees
How Routing is Designed within Akka
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -92,6 +104,29 @@ to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above.
Routers vs. Supervision
^^^^^^^^^^^^^^^^^^^^^^^
As explained in the previous section, routers create new actor instances as
children of the “head” router, who therefor also is their supervisor. The
supervisor strategy of this actor can be configured by means of the
:meth:`RouterConfig.supervisorStrategy` property, which is supported for all
built-in router types. It defaults to “always escalate”, which leads to the
application of the routers parents supervision directive to all children of
the router uniformly (i.e. not only the one which failed). It should be
mentioned that the router overrides the default behavior of terminating all
children upon restart, which means that a restart—while re-creating them—does
not have an effect on the number of actors in the pool.
Setting the strategy is easily done:
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java
:include: supervision
Another potentially useful approach is to give the router the same strategy as
its parent, which effectively treats all actors in the pool as if they were
direct children of their grand-parent instead.
Router usage
^^^^^^^^^^^^

Some files were not shown because too many files have changed in this diff Show more