Support stop or keep of child actors when parent is restarted, #25556

* stash messages and signals while waiting for children to be stopped
* handle watch of other actors
* exception from unstash
* exception from first setup
* merge RestartSupervisor and BackoffSupervisor
* API change: restartWithLimit => restart.withLimit
* remove unused PreStart
* docs
* move BubblingSample to separate class
* fix: fail after more than limit in restart.withLimit when deferred factory throws
* match case RestartOrBackoff instead
This commit is contained in:
Patrik Nordwall 2019-01-17 16:48:22 +01:00 committed by GitHub
parent ac6dd4966e
commit b4fa591d64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 976 additions and 451 deletions

View file

@ -9,5 +9,5 @@ import scala.util.control.NoStackTrace
/**
* A predefined exception that can be used in tests. It doesn't include a stack trace.
*/
class TestException(message: String) extends RuntimeException(message) with NoStackTrace
final case class TestException(message: String) extends RuntimeException(message) with NoStackTrace

View file

@ -13,11 +13,6 @@ import akka.actor.typed.ActorSystem
import scala.util.control.NoStackTrace
/**
* Exception without stack trace to use for verifying exceptions in tests
*/
final case class TE(message: String) extends RuntimeException(message) with NoStackTrace
object TestKitSettings {
/**
* Reads configuration settings from `akka.actor.testkit.typed` section.

View file

@ -115,7 +115,7 @@ public class ActorCompile {
SupervisorStrategy strategy1 = SupervisorStrategy.restart();
SupervisorStrategy strategy2 = SupervisorStrategy.restart().withLoggingEnabled(false);
SupervisorStrategy strategy3 = SupervisorStrategy.resume();
SupervisorStrategy strategy4 = SupervisorStrategy.restartWithLimit(3, Duration.ofSeconds(1));
SupervisorStrategy strategy4 = SupervisorStrategy.restart().withLimit(3, Duration.ofSeconds(1));
SupervisorStrategy strategy5 =
SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(10), 0.1);

View file

@ -0,0 +1,86 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
// #bubbling-example
public class BubblingSample {
interface Message {}
public static class Fail implements Message {
public final String text;
public Fail(String text) {
this.text = text;
}
}
public static Behavior<Message> failingChildBehavior =
Behaviors.receive(Message.class)
.onMessage(
Fail.class,
(context, message) -> {
throw new RuntimeException(message.text);
})
.build();
public static Behavior<Message> middleManagementBehavior =
Behaviors.setup(
(context) -> {
context.getLog().info("Middle management starting up");
final ActorRef<Message> child = context.spawn(failingChildBehavior, "child");
// we want to know when the child terminates, but since we do not handle
// the Terminated signal, we will in turn fail on child termination
context.watch(child);
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
return Behaviors.receive(Message.class)
.onMessage(
Message.class,
(innerCtx, message) -> {
// just pass messages on to the child
child.tell(message);
return Behaviors.same();
})
.build();
});
public static Behavior<Message> bossBehavior =
Behaviors.setup(
(context) -> {
context.getLog().info("Boss starting up");
final ActorRef<Message> middleManagement =
context.spawn(middleManagementBehavior, "middle-management");
context.watch(middleManagement);
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
return Behaviors.receive(Message.class)
.onMessage(
Message.class,
(innerCtx, message) -> {
// just pass messages on to the child
middleManagement.tell(message);
return Behaviors.same();
})
.build();
});
public static void main(String[] args) {
final ActorSystem<Message> system = ActorSystem.create(bossBehavior, "boss");
system.tell(new Fail("boom"));
// this will now bubble up all the way to the boss and as that is the user guardian it means
// the entire actor system will stop
}
}
// #bubbling-example

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.internal.adapter.ActorSystemAdapter;
import akka.testkit.javadsl.EventFilter;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.TimeUnit;
public class BubblingSampleTest extends JUnitSuite {
@Test
public void testBubblingSample() throws Exception {
final ActorSystem<BubblingSample.Message> system =
ActorSystem.create(
BubblingSample.bossBehavior,
"boss",
ConfigFactory.parseString(
"akka.loggers = [ akka.testkit.TestEventListener ]\n" + "akka.loglevel=warning"));
// actual exception and then the deathpacts
new EventFilter(Exception.class, ActorSystemAdapter.toUntyped(system))
.occurrences(4)
.intercept(
() -> {
system.tell(new BubblingSample.Fail("boom"));
return null;
});
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
}
}

View file

@ -1,118 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.internal.adapter.ActorSystemAdapter;
import akka.actor.typed.javadsl.Behaviors;
import akka.testkit.javadsl.EventFilter;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class FaultToleranceDocTest extends JUnitSuite {
// #bubbling-example
interface Message {}
class Fail implements Message {
public final String text;
Fail(String text) {
this.text = text;
}
}
// #bubbling-example
@Test
public void bubblingSample() {
// #bubbling-example
final Behavior<Message> failingChildBehavior =
Behaviors.receive(Message.class)
.onMessage(
Fail.class,
(context, message) -> {
throw new RuntimeException(message.text);
})
.build();
Behavior<Message> middleManagementBehavior =
Behaviors.setup(
(context) -> {
context.getLog().info("Middle management starting up");
final ActorRef<Message> child = context.spawn(failingChildBehavior, "child");
// we want to know when the child terminates, but since we do not handle
// the Terminated signal, we will in turn fail on child termination
context.watch(child);
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
return Behaviors.receive(Message.class)
.onMessage(
Message.class,
(innerCtx, message) -> {
// just pass messages on to the child
child.tell(message);
return Behaviors.same();
})
.build();
});
Behavior<Message> bossBehavior =
Behaviors.setup(
(context) -> {
context.getLog().info("Boss starting up");
final ActorRef<Message> middleManagement =
context.spawn(middleManagementBehavior, "middle-management");
context.watch(middleManagement);
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
return Behaviors.receive(Message.class)
.onMessage(
Message.class,
(innerCtx, message) -> {
// just pass messages on to the child
middleManagement.tell(message);
return Behaviors.same();
})
.build();
});
{
// #bubbling-example
final ActorSystem<Message> system = ActorSystem.create(bossBehavior, "boss");
// #bubbling-example
}
final ActorSystem<Message> system =
ActorSystem.create(
bossBehavior,
"boss",
ConfigFactory.parseString(
"akka.loggers = [ akka.testkit.TestEventListener ]\n" + "akka.loglevel=warning"));
// #bubbling-example
// actual exception and thent the deathpacts
new EventFilter(Exception.class, ActorSystemAdapter.toUntyped(system))
.occurrences(4)
.intercept(
() -> {
// #bubbling-example
system.tell(new Fail("boom"));
// #bubbling-example
return null;
});
// #bubbling-example
// this will now bubble up all the way to the boss and as that is the user guardian it means
// the entire actor system will stop
// #bubbling-example
}
}

View file

@ -68,7 +68,7 @@ public class SupervisionCompileOnlyTest {
Behaviors.supervise(behavior)
.onFailure(
IllegalStateException.class,
SupervisorStrategy.restartWithLimit(10, FiniteDuration.apply(10, TimeUnit.SECONDS)));
SupervisorStrategy.restart().withLimit(10, FiniteDuration.apply(10, TimeUnit.SECONDS)));
// #restart-limit
// #multiple
@ -81,5 +81,54 @@ public class SupervisionCompileOnlyTest {
// #top-level
Behaviors.supervise(counter(1));
// #top-level
}
// #restart-stop-children
static Behavior<String> child(long size) {
return Behaviors.receiveMessage(msg -> child(size + msg.length()));
}
static Behavior<String> parent() {
return Behaviors.<String>supervise(
Behaviors.setup(
ctx -> {
final ActorRef<String> child1 = ctx.spawn(child(0), "child1");
final ActorRef<String> child2 = ctx.spawn(child(0), "child2");
return Behaviors.receiveMessage(
msg -> {
// there might be bugs here...
String[] parts = msg.split(" ");
child1.tell(parts[0]);
child2.tell(parts[1]);
return Behaviors.same();
});
}))
.onFailure(SupervisorStrategy.restart());
}
// #restart-stop-children
// #restart-keep-children
static Behavior<String> parent2() {
return Behaviors.setup(
ctx -> {
final ActorRef<String> child1 = ctx.spawn(child(0), "child1");
final ActorRef<String> child2 = ctx.spawn(child(0), "child2");
// supervision strategy inside the setup to not recreate children on restart
return Behaviors.<String>supervise(
Behaviors.receiveMessage(
msg -> {
// there might be bugs here...
String[] parts = msg.split(" ");
child1.tell(parts[0]);
child2.tell(parts[1]);
return Behaviors.same();
}))
.onFailure(SupervisorStrategy.restart().withStopChildren(false));
});
}
// #restart-keep-children
}

View file

@ -5,33 +5,36 @@
package akka.actor.typed
import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.actor.ActorInitializationException
import akka.actor.typed.scaladsl.{ Behaviors, AbstractBehavior }
import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors }
import akka.actor.typed.scaladsl.Behaviors._
import akka.testkit.EventFilter
import akka.actor.testkit.typed.scaladsl._
import akka.actor.testkit.typed._
import org.scalatest.{ Matchers, WordSpec, WordSpecLike }
import scala.util.control.NoStackTrace
import scala.concurrent.duration._
import akka.actor.typed.SupervisorStrategy.Resume
object SupervisionSpec {
sealed trait Command
case object Ping extends Command
case class Throw(e: Throwable) extends Command
final case class Ping(n: Int) extends Command
final case class Throw(e: Throwable) extends Command
case object IncrementState extends Command
case object GetState extends Command
case class CreateChild[T](behavior: Behavior[T], name: String) extends Command
final case class CreateChild[T](behavior: Behavior[T], name: String) extends Command
final case class Watch(ref: ActorRef[_]) extends Command
sealed trait Event
case object Pong extends Event
case class GotSignal(signal: Signal) extends Event
case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event
final case class Pong(n: Int) extends Event
final case class GotSignal(signal: Signal) extends Event
final case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event
case object Started extends Event
case object StartFailed extends Event
@ -39,14 +42,14 @@ object SupervisionSpec {
class Exc2 extends Exc1("exc-2")
class Exc3(message: String = "exc-3") extends RuntimeException(message) with NoStackTrace
def targetBehavior(monitor: ActorRef[Event], state: State = State(0, Map.empty)): Behavior[Command] =
def targetBehavior(monitor: ActorRef[Event], state: State = State(0, Map.empty), slowStop: Option[CountDownLatch] = None): Behavior[Command] =
receive[Command] { (context, cmd)
cmd match {
case Ping
monitor ! Pong
case Ping(n)
monitor ! Pong(n)
Behaviors.same
case IncrementState
targetBehavior(monitor, state.copy(n = state.n + 1))
targetBehavior(monitor, state.copy(n = state.n + 1), slowStop)
case GetState
val reply = state.copy(children = context.children.map(c c.path.name c.unsafeUpcast[Command]).toMap)
monitor ! reply
@ -54,11 +57,16 @@ object SupervisionSpec {
case CreateChild(childBehv, childName)
context.spawn(childBehv, childName)
Behaviors.same
case Watch(ref)
context.watch(ref)
Behaviors.same
case Throw(e)
throw e
}
} receiveSignal {
case (_, sig)
if (sig == PostStop)
slowStop.foreach(latch latch.await(10, TimeUnit.SECONDS))
monitor ! GotSignal(sig)
Behaviors.same
}
@ -68,7 +76,7 @@ object SupervisionSpec {
throw new RuntimeException("simulated exc from constructor") with NoStackTrace
override def onMessage(message: Command): Behavior[Command] = {
monitor ! Pong
monitor ! Pong(0)
Behaviors.same
}
}
@ -83,8 +91,8 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
val inbox = TestInbox[Event]("evt")
val behv = supervise(targetBehavior(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart)
val testkit = BehaviorTestKit(behv)
testkit.run(Ping)
inbox.receiveMessage() should ===(Pong)
testkit.run(Ping(1))
inbox.receiveMessage() should ===(Pong(1))
}
"stop when no supervise" in {
@ -178,7 +186,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
"stop after restart retries limit" in {
val inbox = TestInbox[Event]("evt")
val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute)
val strategy = SupervisorStrategy.restart.withLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute)
val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy)
val testkit = BehaviorTestKit(behv)
testkit.run(Throw(new Exc1))
@ -194,7 +202,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
"reset retry limit after withinTimeRange" in {
val inbox = TestInbox[Event]("evt")
val withinTimeRange = 2.seconds
val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange)
val strategy = SupervisorStrategy.restart.withLimit(maxNrOfRetries = 2, withinTimeRange)
val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy)
val testkit = BehaviorTestKit(behv)
testkit.run(Throw(new Exc1))
@ -215,7 +223,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
"stop at first exception when restart retries limit is 0" in {
val inbox = TestInbox[Event]("evt")
val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute)
val strategy = SupervisorStrategy.restart.withLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute)
val behv = supervise(targetBehavior(inbox.ref))
.onFailure[Exc1](strategy)
val testkit = BehaviorTestKit(behv)
@ -241,6 +249,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
class SupervisionSpec extends ScalaTestWithActorTestKit(
"""
akka.loggers = [akka.testkit.TestEventListener]
akka.log-dead-letters = off
""") with WordSpecLike {
import SupervisionSpec._
@ -259,10 +268,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
class FailingConstructor(monitor: ActorRef[Event]) extends AbstractBehavior[Command] {
monitor ! Started
if (failCounter.getAndIncrement() < failCount) {
throw TE("simulated exc from constructor")
throw TestException("simulated exc from constructor")
}
override def onMessage(message: Command): Behavior[Command] = {
monitor ! Pong
monitor ! Pong(0)
Behaviors.same
}
}
@ -275,19 +284,19 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
val count = failCounter.getAndIncrement()
if (count < failCount) {
probe.ref ! StartFailed
throw TE(s"construction ${count} failed")
throw TestException(s"construction ${count} failed")
} else {
probe.ref ! Started
Behaviors.empty
}
}).onFailure[TE](strategy)
}).onFailure[TestException](strategy)
}
class FailingUnhandledTestSetup(strategy: SupervisorStrategy) {
val probe = TestProbe[AnyRef]("evt")
def behv = supervise(setup[Command] { _
probe.ref ! StartFailed
throw new TE("construction failed")
throw new TestException("construction failed")
}).onFailure[IllegalArgumentException](strategy)
}
@ -297,8 +306,8 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Throwable](SupervisorStrategy.restart)
val ref = spawn(behv)
ref ! Ping
probe.expectMessage(Pong)
ref ! Ping(1)
probe.expectMessage(Pong(1))
}
"stop when strategy is stop" in {
@ -387,7 +396,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
val probe = TestProbe[Event]("evt")
val resetTimeout = 500.millis
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout))
.onFailure[Exc1](SupervisorStrategy.restart.withLimit(2, resetTimeout))
val ref = spawn(behv)
ref ! IncrementState
ref ! GetState
@ -401,15 +410,17 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PostStop))
}
ref ! GetState
probe.expectNoMessage()
EventFilter.warning(start = "received dead letter", occurrences = 1).intercept {
ref ! GetState
probe.expectNoMessage()
}
}
"reset fixed limit after timeout" in {
val probe = TestProbe[Event]("evt")
val resetTimeout = 500.millis
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout))
.onFailure[Exc1](SupervisorStrategy.restart.withLimit(2, resetTimeout))
val ref = spawn(behv)
ref ! IncrementState
ref ! GetState
@ -430,10 +441,63 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
probe.expectMessage(State(0, Map.empty))
}
"NOT stop children when restarting" in {
"stop children when restarting" in {
testStopChildren(strategy = SupervisorStrategy.restart)
}
"stop children when backoff" in {
testStopChildren(strategy = SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0))
}
def testStopChildren(strategy: SupervisorStrategy): Unit = {
val parentProbe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref))
.onFailure[Exc1](SupervisorStrategy.restart)
.onFailure[Exc1](strategy)
val ref = spawn(behv)
val anotherProbe = TestProbe[String]("another")
ref ! Watch(anotherProbe.ref)
val childProbe = TestProbe[Event]("childEvt")
val slowStop = new CountDownLatch(1)
val child1Name = nextName()
val child2Name = nextName()
ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), child1Name)
ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), child2Name)
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should ===(Set(child1Name, child2Name))
EventFilter[Exc1](occurrences = 1).intercept {
ref ! Throw(new Exc1)
parentProbe.expectMessage(GotSignal(PreRestart))
ref ! GetState
anotherProbe.stop()
}
// waiting for children to stop, GetState stashed
parentProbe.expectNoMessage()
slowStop.countDown()
childProbe.expectMessage(GotSignal(PostStop))
childProbe.expectMessage(GotSignal(PostStop))
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
// anotherProbe was stopped, Terminated signal stashed and delivered to new behavior
parentProbe.expectMessage(GotSignal(Terminated(anotherProbe.ref)))
}
"optionally NOT stop children when restarting" in {
testNotStopChildren(strategy = SupervisorStrategy.restart.withStopChildren(enabled = false))
}
"optionally NOT stop children when backoff" in {
testNotStopChildren(strategy = SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0)
.withStopChildren(enabled = false))
}
def testNotStopChildren(strategy: SupervisorStrategy): Unit = {
val parentProbe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref))
.onFailure[Exc1](strategy)
val ref = spawn(behv)
val childProbe = TestProbe[Event]("childEvt")
@ -447,12 +511,154 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
parentProbe.expectMessage(GotSignal(PreRestart))
ref ! GetState
}
// TODO document this difference compared to classic actors, and that
// children can be stopped if needed in PreRestart
parentProbe.expectMessageType[State].children.keySet should contain(childName)
childProbe.expectNoMessage()
}
"stop children when restarting second time during unstash" in {
testStopChildrenWhenExceptionFromUnstash(SupervisorStrategy.restart)
}
"stop children when backoff second time during unstash" in {
testStopChildrenWhenExceptionFromUnstash(
SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0))
}
def testStopChildrenWhenExceptionFromUnstash(strategy: SupervisorStrategy): Unit = {
val parentProbe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref))
.onFailure[Exc1](strategy)
val ref = spawn(behv)
val childProbe = TestProbe[Event]("childEvt")
val slowStop = new CountDownLatch(1)
val child1Name = nextName()
ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), child1Name)
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should ===(Set(child1Name))
val child2Name = nextName()
EventFilter[Exc1](occurrences = 1).intercept {
ref ! Throw(new Exc1)
parentProbe.expectMessage(GotSignal(PreRestart))
ref ! GetState
ref ! CreateChild(targetBehavior(childProbe.ref), child2Name)
ref ! GetState
ref ! Throw(new Exc1)
}
EventFilter[Exc1](occurrences = 1).intercept {
slowStop.countDown()
childProbe.expectMessage(GotSignal(PostStop)) // child1
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
parentProbe.expectMessageType[State].children.keySet should ===(Set(child2Name))
// the stashed Throw is causing another restart and stop of child2
childProbe.expectMessage(GotSignal(PostStop)) // child2
}
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
}
"stop children when restart (with limit) from exception in first setup" in {
testStopChildrenWhenExceptionFromFirstSetup(SupervisorStrategy.restart.withLimit(10, 1.second))
}
"stop children when backoff from exception in first setup" in {
testStopChildrenWhenExceptionFromFirstSetup(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0))
}
def testStopChildrenWhenExceptionFromFirstSetup(strategy: SupervisorStrategy): Unit = {
val parentProbe = TestProbe[Event]("evt")
val child1Probe = TestProbe[Event]("childEvt")
val child2Probe = TestProbe[Event]("childEvt")
val slowStop1 = new CountDownLatch(1)
val slowStop2 = new CountDownLatch(1)
val throwFromSetup = new AtomicBoolean(true)
val behv = Behaviors.supervise {
Behaviors.setup[Command] { ctx
ctx.spawn(targetBehavior(child1Probe.ref, slowStop = Some(slowStop1)), "child1")
if (throwFromSetup.get()) {
// note that this second child waiting on slowStop2 will prevent a restart loop that could exhaust the
// limit before throwFromSetup is set back to false
ctx.spawn(targetBehavior(child2Probe.ref, slowStop = Some(slowStop2)), "child2")
throw TestException("exc from setup")
}
targetBehavior(parentProbe.ref)
}
}
.onFailure[RuntimeException](strategy)
EventFilter[TestException](occurrences = 1).intercept {
val ref = spawn(behv)
slowStop1.countDown()
child1Probe.expectMessage(GotSignal(PostStop))
throwFromSetup.set(false)
slowStop2.countDown()
child2Probe.expectMessage(GotSignal(PostStop))
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should ===(Set("child1"))
}
}
"stop children when restart (with limit) from exception in later setup" in {
testStopChildrenWhenExceptionFromLaterSetup(SupervisorStrategy.restart.withLimit(10, 1.second))
}
"stop children when backoff from exception in later setup" in {
testStopChildrenWhenExceptionFromLaterSetup(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0))
}
def testStopChildrenWhenExceptionFromLaterSetup(strategy: SupervisorStrategy): Unit = {
val parentProbe = TestProbe[Event]("evt")
val child1Probe = TestProbe[Event]("childEvt")
val child2Probe = TestProbe[Event]("childEvt")
val slowStop1 = new CountDownLatch(1)
val slowStop2 = new CountDownLatch(1)
val throwFromSetup = new AtomicBoolean(false)
val behv = Behaviors.supervise {
Behaviors.setup[Command] { ctx
ctx.spawn(targetBehavior(child1Probe.ref, slowStop = Some(slowStop1)), "child1")
if (throwFromSetup.get()) {
// note that this second child waiting on slowStop2 will prevent a restart loop that could exhaust the
// limit before throwFromSetup is set back to false
ctx.spawn(targetBehavior(child2Probe.ref, slowStop = Some(slowStop2)), "child2")
throw TestException("exc from setup")
}
targetBehavior(parentProbe.ref)
}
}
.onFailure[RuntimeException](strategy)
val ref = spawn(behv)
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should ===(Set("child1"))
throwFromSetup.set(true)
EventFilter[Exc1](occurrences = 1).intercept {
ref ! Throw(new Exc1)
parentProbe.expectMessage(GotSignal(PreRestart))
}
EventFilter[TestException](occurrences = 1).intercept {
slowStop1.countDown()
child1Probe.expectMessage(GotSignal(PostStop))
child1Probe.expectMessage(GotSignal(PostStop))
throwFromSetup.set(false)
slowStop2.countDown()
child2Probe.expectMessage(GotSignal(PostStop))
}
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should ===(Set("child1"))
}
"resume when handled exception" in {
val probe = TestProbe[Event]("evt")
val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](SupervisorStrategy.resume)
@ -502,12 +708,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"publish dropped messages while backing off" in {
"publish dropped messages while backing off and stash is full" in {
val probe = TestProbe[Event]("evt")
val startedProbe = TestProbe[Event]("started")
val minBackoff = 10.seconds
val minBackoff = 1.seconds
val strategy = SupervisorStrategy
.restartWithBackoff(minBackoff, minBackoff, 0.0)
.restartWithBackoff(minBackoff, minBackoff, 0.0).withStashCapacity(2)
val behv = Behaviors.supervise(Behaviors.setup[Command] { _
startedProbe.ref ! Started
targetBehavior(probe.ref)
@ -521,8 +727,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
}
ref ! Ping
droppedMessagesProbe.expectMessage(Dropped(Ping, ref))
ref ! Ping(1)
ref ! Ping(2)
ref ! Ping(3)
ref ! Ping(4)
probe.expectMessage(Pong(1))
probe.expectMessage(Pong(2))
droppedMessagesProbe.expectMessage(Dropped(Ping(3), ref))
droppedMessagesProbe.expectMessage(Dropped(Ping(4), ref))
}
"restart after exponential backoff" in {
@ -532,6 +744,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
val strategy = SupervisorStrategy
.restartWithBackoff(minBackoff, 10.seconds, 0.0)
.withResetBackoffAfter(10.seconds)
.withStashCapacity(0)
val behv = Behaviors.supervise(Behaviors.setup[Command] { _
startedProbe.ref ! Started
targetBehavior(probe.ref)
@ -543,7 +756,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
ref ! Ping(1) // dropped due to backoff, no stashing
}
startedProbe.expectNoMessage(minBackoff - 100.millis)
@ -557,7 +770,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
ref ! Ping(2) // dropped due to backoff, no stashing
}
startedProbe.expectNoMessage((minBackoff * 2) - 100.millis)
@ -578,18 +791,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
val alreadyStarted = new AtomicBoolean(false)
val behv = Behaviors.supervise(Behaviors.setup[Command] { _
if (alreadyStarted.get()) throw TE("failure to restart")
if (alreadyStarted.get()) throw TestException("failure to restart")
alreadyStarted.set(true)
startedProbe.ref ! Started
Behaviors.receiveMessage {
Behaviors.receiveMessagePartial {
case Throw(boom) throw boom
}
}).onFailure[Exception](strategy)
val ref = spawn(behv)
EventFilter[Exc1](occurrences = 1).intercept {
EventFilter[TE](occurrences = 2).intercept {
EventFilter[TestException](occurrences = 2).intercept {
startedProbe.expectMessage(Started)
ref ! Throw(new Exc1)
probe.expectTerminated(ref, 3.seconds)
@ -602,6 +815,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
val minBackoff = 1.seconds
val strategy = SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0)
.withResetBackoffAfter(100.millis)
.withStashCapacity(0)
val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy)
val ref = spawn(behv)
@ -609,7 +823,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
ref ! Ping(1) // dropped due to backoff, no stash
}
probe.expectNoMessage(minBackoff + 100.millis.dilated)
@ -622,7 +836,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
ref ! Ping(2) // dropped due to backoff
}
// backoff was reset, so restarted after the minBackoff
@ -663,8 +877,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
failCount = 1,
strategy = SupervisorStrategy.resume
) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
EventFilter[TestException](occurrences = 1).intercept {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
}
}
}
@ -673,7 +889,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
strategy = SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis.dilated, maxBackoff = 1.second, 0)
) {
EventFilter[TE](occurrences = 1).intercept {
EventFilter[TestException](occurrences = 1).intercept {
spawn(behv)
probe.expectMessage(StartFailed)
@ -692,12 +908,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup(
"restart.withLimit when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 1,
strategy = SupervisorStrategy.restartWithLimit(3, 1.second)
strategy = SupervisorStrategy.restart.withLimit(3, 1.second)
) {
EventFilter[TE](occurrences = 1).intercept {
EventFilter[TestException](occurrences = 1).intercept {
spawn(behv)
probe.expectMessage(StartFailed)
@ -705,16 +921,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"fail after more than limit in restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 3,
strategy = SupervisorStrategy.restartWithLimit(2, 1.second)
"fail after more than limit in restart.withLimit when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 20,
strategy = SupervisorStrategy.restart.withLimit(2, 1.second)
) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
EventFilter[TE](occurrences = 1).intercept {
EventFilter[TestException](occurrences = 2).intercept {
spawn(behv)
// restarted 2 times before it gave up
// first one from initial setup
probe.expectMessage(StartFailed)
// and then restarted 2 times before it gave up
probe.expectMessage(StartFailed)
probe.expectMessage(StartFailed)
probe.expectNoMessage(100.millis)
@ -723,7 +941,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
"fail instead of restart with limit when deferred factory throws unhandled" in new FailingUnhandledTestSetup(
strategy = SupervisorStrategy.restartWithLimit(3, 1.second)) {
strategy = SupervisorStrategy.restart.withLimit(3, 1.second)) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
@ -743,10 +961,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
"work with nested supervisions and defers" in {
val strategy = SupervisorStrategy.restartWithLimit(3, 1.second)
val strategy = SupervisorStrategy.restart.withLimit(3, 1.second)
val probe = TestProbe[AnyRef]("p")
val beh = supervise[String](setup(context
supervise[String](setup { context
val beh = supervise[String](setup(_
supervise[String](setup { _
probe.ref ! Started
Behaviors.empty[String]
}).onFailure[RuntimeException](strategy)
@ -759,7 +977,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
"replace supervision when new returned behavior catches same exception" in {
val probe = TestProbe[AnyRef]("probeMcProbeFace")
val behv = supervise[String](Behaviors.receiveMessage {
case "boom" throw TE("boom indeed")
case "boom" throw TestException("boom indeed")
case "switch"
supervise[String](
supervise[String](
@ -767,7 +985,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
supervise[String](
supervise[String](
Behaviors.receiveMessage {
case "boom" throw TE("boom indeed")
case "boom" throw TestException("boom indeed")
case "ping"
probe.ref ! "pong"
Behaviors.same
@ -776,7 +994,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
Behaviors.stopped
}).onFailure[RuntimeException](SupervisorStrategy.resume)
).onFailure[RuntimeException](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 23D))
).onFailure[RuntimeException](SupervisorStrategy.restartWithLimit(23, 10.seconds))
).onFailure[RuntimeException](SupervisorStrategy.restart.withLimit(23, 10.seconds))
).onFailure[IllegalArgumentException](SupervisorStrategy.restart)
).onFailure[RuntimeException](SupervisorStrategy.restart)
}).onFailure[RuntimeException](SupervisorStrategy.stop)
@ -812,15 +1030,15 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
val behv = supervise[String](Behaviors.receiveMessage {
case "boom" throw TE("boom indeed")
case "boom" throw TestException("boom indeed")
case "switch"
supervise[String](
setup(context
setup(_
supervise[String](
Behaviors.intercept(whateverInterceptor)(
supervise[String](
Behaviors.receiveMessage {
case "boom" throw TE("boom indeed")
case "boom" throw TestException("boom indeed")
case "ping"
probe.ref ! "pong"
Behaviors.same
@ -829,7 +1047,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
Behaviors.stopped
}).onFailure[RuntimeException](SupervisorStrategy.resume)
)
).onFailure[IllegalArgumentException](SupervisorStrategy.restartWithLimit(23, 10.seconds))
).onFailure[IllegalArgumentException](SupervisorStrategy.restart.withLimit(23, 10.seconds))
)
).onFailure[RuntimeException](SupervisorStrategy.restart)
}).onFailure[RuntimeException](SupervisorStrategy.stop)
@ -846,14 +1064,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
actor ! "give me stacktrace"
val stacktrace = probe.expectMessageType[Vector[StackTraceElement]]
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2)
stacktrace.count(_.toString.contains("Supervisor.aroundReceive")) should ===(2)
}
"replace backoff supervision duplicate when behavior is created in a setup" in {
val probe = TestProbe[AnyRef]("probeMcProbeFace")
val restartCount = new AtomicInteger(0)
val behv = supervise[String](
Behaviors.setup { context
Behaviors.setup { _
// a bit superficial, but just to be complete
if (restartCount.incrementAndGet() == 1) {
@ -861,7 +1079,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
Behaviors.receiveMessage {
case "boom"
probe.ref ! "crashing 1"
throw TE("boom indeed")
throw TestException("boom indeed")
case "ping"
probe.ref ! "pong 1"
Behaviors.same
@ -872,12 +1090,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
Behaviors.receiveMessage {
case "boom"
probe.ref ! "crashing 2"
throw TE("boom indeed")
throw TestException("boom indeed")
case "ping"
probe.ref ! "pong 2"
Behaviors.same
}
).onFailure[TE](SupervisorStrategy.resume)
).onFailure[TestException](SupervisorStrategy.resume)
}
}
).onFailure(SupervisorStrategy.restartWithBackoff(100.millis, 1.second, 0))
@ -886,16 +1104,17 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
probe.expectMessage("started 1")
ref ! "ping"
probe.expectMessage("pong 1")
EventFilter[TE](occurrences = 1).intercept {
EventFilter[TestException](occurrences = 1).intercept {
ref ! "boom"
probe.expectMessage("crashing 1")
ref ! "ping"
probe.expectNoMessage(100.millis)
}
probe.expectMessage("started 2")
probe.expectMessage("pong 2") // from "ping" that was stashed
ref ! "ping"
probe.expectMessage("pong 2")
EventFilter[TE](occurrences = 1).intercept {
EventFilter[TestException](occurrences = 1).intercept {
ref ! "boom" // now we should have replaced supervision with the resuming one
probe.expectMessage("crashing 2")
}
@ -941,7 +1160,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
SupervisorStrategy.restart,
SupervisorStrategy.resume,
SupervisorStrategy.restartWithBackoff(1.millis, 100.millis, 2D),
SupervisorStrategy.restartWithLimit(1, 100.millis)
SupervisorStrategy.restart.withLimit(1, 100.millis)
)
allStrategies.foreach { strategy
@ -978,13 +1197,13 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
} else {
stopInSetup.set(true)
Behaviors.receiveMessage {
case "boom" throw TE("boom")
case "boom" throw TestException("boom")
}
}
}).onFailure[TE](strategy)
}).onFailure[TestException](strategy)
)
EventFilter[TE](occurrences = 1).intercept {
EventFilter[TestException](occurrences = 1).intercept {
actor ! "boom"
}
createTestProbe().expectTerminated(actor, 3.second)

View file

@ -5,8 +5,9 @@
package akka.actor.typed.scaladsl
import scala.concurrent.Promise
import akka.Done
import akka.actor.testkit.typed.TE
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed
import akka.actor.typed.Behavior
@ -76,7 +77,7 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike {
Behaviors.stopped(
// illegal:
Behaviors.setup[String] { _
throw TE("boom!")
throw TestException("boom!")
}
)
}

View file

@ -6,12 +6,18 @@ package akka.actor.typed.scaladsl.adapter
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
import akka.actor.InvalidMessageException
import akka.actor.testkit.typed.TE
import akka.actor.testkit.typed.TestException
import akka.actor.typed.scaladsl.Behaviors
import akka.{ Done, NotUsed, actor untyped }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Terminated
import akka.testkit._
import akka.Done
import akka.NotUsed
import akka.{ actor untyped }
object AdapterSpec {
val untyped1: untyped.Props = untyped.Props(new Untyped1)
@ -69,7 +75,7 @@ object AdapterSpec {
def unhappyTyped(msg: String): Behavior[String] = Behaviors.setup[String] { ctx
val child = ctx.spawnAnonymous(Behaviors.receiveMessage[String] { _
throw TE(msg)
throw TestException(msg)
})
child ! "throw please"
Behaviors.empty

View file

@ -7,9 +7,10 @@ package docs.akka.typed.supervision
import akka.actor.typed.ActorRef
import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
import akka.actor.TypedActor.PreRestart
object SupervisionCompileOnly {
val behavior = Behaviors.empty[String]
@ -26,7 +27,7 @@ object SupervisionCompileOnly {
//#restart-limit
Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restartWithLimit(
.onFailure[IllegalStateException](SupervisorStrategy.restart.withLimit(
maxNrOfRetries = 10, withinTimeRange = 10.seconds
))
//#restart-limit
@ -54,4 +55,46 @@ object SupervisionCompileOnly {
//#top-level
Behaviors.supervise(counter(1))
//#top-level
//#restart-stop-children
def child(size: Long): Behavior[String] =
Behaviors.receiveMessage(msg child(size + msg.length))
def parent: Behavior[String] = {
Behaviors.supervise[String] {
Behaviors.setup { ctx
val child1 = ctx.spawn(child(0), "child1")
val child2 = ctx.spawn(child(0), "child2")
Behaviors.receiveMessage[String] { msg
// there might be bugs here...
val parts = msg.split(" ")
child1 ! parts(0)
child2 ! parts(1)
Behaviors.same
}
}
}.onFailure(SupervisorStrategy.restart)
}
//#restart-stop-children
//#restart-keep-children
def parent2: Behavior[String] = {
Behaviors.setup { ctx
val child1 = ctx.spawn(child(0), "child1")
val child2 = ctx.spawn(child(0), "child2")
// supervision strategy inside the setup to not recreate children on restart
Behaviors.supervise {
Behaviors.receiveMessage[String] { msg
// there might be bugs here...
val parts = msg.split(" ")
child1 ! parts(0)
child2 ! parts(1)
Behaviors.same
}
}.onFailure(SupervisorStrategy.restart.withStopChildren(false))
}
}
//#restart-keep-children
}

View file

@ -17,6 +17,12 @@ akka.actor.typed {
# Receptionist is started eagerly to allow clustered receptionist to gather remote registrations early on.
library-extensions += "akka.actor.typed.receptionist.Receptionist"
# While an actor is restarted (waiting for backoff to expire and children to stop)
# incoming messages and signals are stashed, and delivered later to the newly restarted
# behavior. This property defines the capacity in number of messages of the stash
# buffer. If the capacity is exceed then additional incoming messages are dropped.
restart-stash-capacity = 1000
}
# Load typed extensions by an untyped unextension.

View file

@ -5,20 +5,25 @@
package akka.actor.typed
import akka.{ actor untyped }
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import java.util.concurrent.CompletionStage
import java.util.concurrent.ThreadFactory
import akka.actor.setup.ActorSystemSetup
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter }
import akka.util.Timeout
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.actor.BootstrapSetup
import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.adapter.GuardianActorAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.internal.adapter.PropsAdapter
import akka.actor.typed.receptionist.Receptionist
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.util.Helpers.Requiring
import akka.util.Timeout
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
/**
* An ActorSystem is home to a hierarchy of Actors. It is created using
@ -237,7 +242,8 @@ object ActorSystem {
* Wrap an untyped [[akka.actor.ActorSystem]] such that it can be used from
* Akka Typed [[Behavior]].
*/
def wrap(system: untyped.ActorSystem): ActorSystem[Nothing] = ActorSystemAdapter.AdapterExtension(system.asInstanceOf[untyped.ActorSystemImpl]).adapter
def wrap(system: untyped.ActorSystem): ActorSystem[Nothing] =
ActorSystemAdapter.AdapterExtension(system.asInstanceOf[untyped.ActorSystemImpl]).adapter
}
/**
@ -245,19 +251,23 @@ object ActorSystem {
* This class is immutable.
*/
final class Settings(val config: Config, val untypedSettings: untyped.ActorSystem.Settings, val name: String) {
def this(_cl: ClassLoader, _config: Config, name: String) = this({
val config = _config.withFallback(ConfigFactory.defaultReference(_cl))
config.checkValid(ConfigFactory.defaultReference(_cl), "akka")
config
}, new untyped.ActorSystem.Settings(_cl, _config, name), name)
def this(classLoader: ClassLoader, config: Config, name: String) = this({
val cfg = config.withFallback(ConfigFactory.defaultReference(classLoader))
cfg.checkValid(ConfigFactory.defaultReference(classLoader), "akka")
cfg
}, new untyped.ActorSystem.Settings(classLoader, config, name), name)
def this(settings: untyped.ActorSystem.Settings) = this(settings.config, settings, settings.name)
private var foundSettings = List.empty[String]
foundSettings = foundSettings.reverse
def setup: ActorSystemSetup = untypedSettings.setup
override def toString: String = s"Settings($name,\n ${foundSettings.mkString("\n ")})"
/**
* Returns the String representation of the Config that this Settings is backed by
*/
override def toString: String = config.root.render
private val typedConfig = config.getConfig("akka.actor.typed")
val RestartStashCapacity: Int = typedConfig.getInt("restart-stash-capacity")
.requiring(_ >= 0, "restart-stash-capacity must be >= 0")
}

View file

@ -34,8 +34,7 @@ trait Signal
/**
* Lifecycle signal that is fired upon restart of the Actor before replacing
* the behavior with the fresh one (i.e. this signal is received within the
* behavior that failed). The replacement behavior will receive PreStart as its
* first signal.
* behavior that failed).
*/
sealed abstract class PreRestart extends Signal
case object PreRestart extends PreRestart {
@ -80,6 +79,15 @@ object Terminated {
sealed class Terminated(val ref: ActorRef[Nothing]) extends Signal {
/** Java API: The actor that was watched and got terminated */
def getRef(): ActorRef[Void] = ref.asInstanceOf[ActorRef[Void]]
override def toString: String = s"Terminated($ref)"
override def hashCode(): Int = ref.hashCode()
override def equals(obj: Any): Boolean = obj match {
case Terminated(`ref`) true
case _ false
}
}
object ChildFailed {
@ -96,4 +104,13 @@ final class ChildFailed(ref: ActorRef[Nothing], val cause: Throwable) extends Te
* Java API
*/
def getCause(): Throwable = cause
override def toString: String = s"ChildFailed($ref,${cause.getClass.getName})"
override def hashCode(): Int = ref.hashCode()
override def equals(obj: Any): Boolean = obj match {
case ChildFailed(`ref`, `cause`) true
case _ false
}
}

View file

@ -22,50 +22,20 @@ object SupervisorStrategy {
val resume: SupervisorStrategy = Resume(loggingEnabled = true)
/**
* Restart immediately without any limit on number of restart retries.
* Restart immediately without any limit on number of restart retries. A limit can be
* added with [[RestartSupervisorStrategy.withLimit]].
*
* If the actor behavior is deferred and throws an exception on startup the actor is stopped
* (restarting would be dangerous as it could lead to an infinite restart-loop)
*/
val restart: SupervisorStrategy = Restart(-1, Duration.Zero, loggingEnabled = true)
val restart: RestartSupervisorStrategy =
Restart(maxRestarts = -1, withinTimeRange = Duration.Zero)
/**
* Stop the actor
*/
val stop: SupervisorStrategy = Stop(loggingEnabled = true)
/**
* Scala API: Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries
*/
def restartWithLimit(maxNrOfRetries: Int, withinTimeRange: FiniteDuration): SupervisorStrategy =
Restart(maxNrOfRetries, withinTimeRange, loggingEnabled = true)
/**
* Java API: Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries
*/
def restartWithLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): SupervisorStrategy =
restartWithLimit(maxNrOfRetries, withinTimeRange.asScala)
/**
* Scala API: It supports exponential back-off between the given `minBackoff` and
* `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
@ -99,7 +69,7 @@ object SupervisorStrategy {
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): BackoffSupervisorStrategy =
Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true, maxRestarts = -1)
Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff)
/**
* Java API: It supports exponential back-off between the given `minBackoff` and
@ -153,15 +123,40 @@ object SupervisorStrategy {
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Restart(
maxNrOfRetries: Int,
withinTimeRange: FiniteDuration,
loggingEnabled: Boolean) extends SupervisorStrategy {
@InternalApi private[akka] sealed trait RestartOrBackoff extends SupervisorStrategy {
def maxRestarts: Int
def stopChildren: Boolean
def stashCapacity: Int
def loggingEnabled: Boolean
override def withLoggingEnabled(enabled: Boolean): SupervisorStrategy =
def unlimitedRestarts(): Boolean = maxRestarts == -1
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Restart(
maxRestarts: Int,
withinTimeRange: FiniteDuration,
loggingEnabled: Boolean = true,
stopChildren: Boolean = true,
stashCapacity: Int = -1) extends RestartSupervisorStrategy with RestartOrBackoff {
override def withLimit(maxNrOfRetries: Int, withinTimeRange: FiniteDuration): RestartSupervisorStrategy =
copy(maxNrOfRetries, withinTimeRange)
override def withLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): RestartSupervisorStrategy =
copy(maxNrOfRetries, withinTimeRange.asScala)
override def withStopChildren(enabled: Boolean): RestartSupervisorStrategy =
copy(stopChildren = enabled)
override def withStashCapacity(capacity: Int): RestartSupervisorStrategy =
copy(stashCapacity = capacity)
override def withLoggingEnabled(enabled: Boolean): RestartSupervisorStrategy =
copy(loggingEnabled = enabled)
def unlimitedRestarts(): Boolean = maxNrOfRetries == -1
}
/**
@ -172,10 +167,12 @@ object SupervisorStrategy {
maxBackoff: FiniteDuration,
randomFactor: Double,
resetBackoffAfter: FiniteDuration,
loggingEnabled: Boolean,
maxRestarts: Int) extends BackoffSupervisorStrategy {
loggingEnabled: Boolean = true,
maxRestarts: Int = -1,
stopChildren: Boolean = true,
stashCapacity: Int = -1) extends BackoffSupervisorStrategy with RestartOrBackoff {
override def withLoggingEnabled(enabled: Boolean): SupervisorStrategy =
override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy =
copy(loggingEnabled = enabled)
override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy =
@ -188,6 +185,12 @@ object SupervisorStrategy {
override def withMaxRestarts(maxRestarts: Int): BackoffSupervisorStrategy =
copy(maxRestarts = maxRestarts)
override def withStopChildren(enabled: Boolean): BackoffSupervisorStrategy =
copy(stopChildren = enabled)
override def withStashCapacity(capacity: Int): BackoffSupervisorStrategy =
copy(stashCapacity = capacity)
}
}
@ -197,6 +200,59 @@ sealed abstract class SupervisorStrategy {
def withLoggingEnabled(on: Boolean): SupervisorStrategy
}
sealed abstract class RestartSupervisorStrategy extends SupervisorStrategy {
/**
* Scala API: Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries
*/
def withLimit(maxNrOfRetries: Int, withinTimeRange: FiniteDuration): RestartSupervisorStrategy
/**
* Java API: Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries
*/
def withLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): RestartSupervisorStrategy
/**
* Stop or keep child actors when the parent actor is restarted.
* By default child actors are stopped when parent is restarted.
* @param enabled if `true` then child actors are stopped, otherwise they are kept
*/
def withStopChildren(enabled: Boolean): RestartSupervisorStrategy
/**
* While restarting (waiting for children to stop) incoming messages and signals are
* stashed, and delivered later to the newly restarted behavior. This property defines
* the capacity in number of messages of the stash buffer. If the capacity is exceed
* then additional incoming messages are dropped.
*
* By default the capacity is defined by config property `akka.actor.typed.restart-stash-capacity`.
*/
def withStashCapacity(capacity: Int): RestartSupervisorStrategy
override def withLoggingEnabled(enabled: Boolean): RestartSupervisorStrategy
}
sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy {
def resetBackoffAfter: FiniteDuration
@ -221,4 +277,24 @@ sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy {
* the upper limit on restarts (and is the default)
*/
def withMaxRestarts(maxRestarts: Int): BackoffSupervisorStrategy
/**
* Stop or keep child actors when the parent actor is restarted.
* By default child actors are stopped when parent is restarted.
* @param enabled if `true` then child actors are stopped, otherwise they are kept
*/
def withStopChildren(enabled: Boolean): BackoffSupervisorStrategy
/**
* While restarting (waiting for backoff to expire and children to stop) incoming
* messages and signals are stashed, and delivered later to the newly restarted
* behavior. This property defines the capacity in number of messages of the stash
* buffer. If the capacity is exceed then additional incoming messages are dropped.
*
* By default the capacity is defined by config property `akka.actor.typed.restart-stash-capacity`.
*/
def withStashCapacity(capacity: Int): BackoffSupervisorStrategy
override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy
}

View file

@ -7,32 +7,36 @@ package internal
import java.util.concurrent.ThreadLocalRandom
import akka.actor.DeadLetterSuppression
import akka.actor.typed.BehaviorInterceptor.{ PreStartTarget, ReceiveTarget, SignalTarget }
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.util.{ OptionVal, unused }
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.concurrent.duration.Deadline
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
import akka.actor.DeadLetterSuppression
import akka.actor.typed.BehaviorInterceptor.PreStartTarget
import akka.actor.typed.BehaviorInterceptor.ReceiveTarget
import akka.actor.typed.BehaviorInterceptor.SignalTarget
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.StashBuffer
import akka.annotation.InternalApi
import akka.event.Logging
import akka.util.OptionVal
import akka.util.unused
/**
* INTERNAL API
*/
@InternalApi private[akka] object Supervisor {
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = {
strategy match {
case r: RestartOrBackoff
Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior)
case r: Resume
Behaviors.intercept[T, T](new ResumeSupervisor(r))(initialBehavior)
case r: Restart
Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior)
case r: Stop
Behaviors.intercept[T, T](new StopSupervisor(r))(initialBehavior)
case r: Backoff
Behaviors.intercept[T, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior)
Behaviors.intercept[T, T](new StopSupervisor(initialBehavior, r))(initialBehavior)
}
}
}
@ -55,7 +59,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
override def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = {
try {
target.start(ctx)
} catch handleExceptionOnStart(ctx)
} catch handleExceptionOnStart(ctx, target)
}
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
@ -70,9 +74,16 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
}
}
protected def handleExceptionOnStart(ctx: TypedActorContext[O]): Catcher[Behavior[I]]
def dropped(ctx: TypedActorContext[_], signalOrMessage: Any): Unit = {
import akka.actor.typed.scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signalOrMessage, ctx.asScala.self))
}
protected def handleExceptionOnStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Catcher[Behavior[I]]
protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]]
protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]]
override def toString: String = Logging.simpleName(getClass)
}
/**
@ -92,7 +103,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
}
// convenience if target not required to handle exception
protected def handleExceptionOnStart(ctx: TypedActorContext[T]): Catcher[Behavior[T]] =
protected def handleExceptionOnStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
@ -100,7 +111,8 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
handleException(ctx)
}
private class StopSupervisor[T, Thr <: Throwable: ClassTag](strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) {
private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop)
extends SimpleSupervisor[T, Thr](strategy) {
override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
@ -116,149 +128,7 @@ private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extend
}
}
private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) {
private var restarts = 0
private var deadline: OptionVal[Deadline] = OptionVal.None
private def deadlineHasTimeLeft: Boolean = deadline match {
case OptionVal.None true
case OptionVal.Some(d) d.hasTimeLeft
}
override def aroundStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
try {
target.start(ctx)
} catch {
case NonFatal(t: Thr)
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
if (strategy.unlimitedRestarts() || ((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft)) {
// don't log here as it'll be logged as ActorInitializationException
throw t
} else {
log(ctx, t)
restart()
aroundStart(ctx, target)
}
}
}
private def restart() = {
val timeLeft = deadlineHasTimeLeft
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange)
restarts = if (timeLeft) restarts + 1 else 1
deadline = newDeadline
}
private def handleException(ctx: TypedActorContext[T], signalRestart: () Unit): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) {
throw t
} else {
try {
signalRestart()
} catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
log(ctx, t)
restart()
Behavior.validateAsInitial(Behavior.start(initial, ctx))
}
}
override protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, () target(ctx, PreRestart))
}
override protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, () target.signalRestart(ctx))
}
}
private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[O, T, Thr](b) {
import BackoffSupervisor._
var blackhole = false
var restartCount: Int = 0
override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
if (blackhole) {
import akka.actor.typed.scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self))
Behaviors.same
} else {
super.aroundSignal(ctx, signal, target)
}
}
override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
try {
msg.asInstanceOf[Any] match {
case ScheduledRestart
blackhole = false
ctx.asScala.scheduleOnce(b.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount))
try {
Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]]))
} catch {
case NonFatal(ex: Thr) if b.maxRestarts > 0 && restartCount >= b.maxRestarts
log(ctx, ex)
Behavior.failed(ex)
case NonFatal(ex: Thr) scheduleRestart(ctx, ex)
}
case ResetRestartCount(current)
if (current == restartCount) {
restartCount = 0
}
Behavior.same
case _
if (blackhole) {
import akka.actor.typed.scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self.unsafeUpcast[Any]))
Behaviors.same
} else {
target(ctx, msg.asInstanceOf[T])
}
}
} catch handleReceiveException(ctx, target)
}
protected def handleExceptionOnStart(ctx: TypedActorContext[O]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
scheduleRestart(ctx, t)
}
protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
try {
target.signalRestart(ctx)
} catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
scheduleRestart(ctx, t)
}
protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
try {
target(ctx, PreRestart)
} catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
scheduleRestart(ctx, t)
}
private def scheduleRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = {
log(ctx, reason)
val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart)
restartCount += 1
blackhole = true
Behaviors.empty
}
}
private object BackoffSupervisor {
private object RestartSupervisor {
/**
* Calculates an exponential back off delay.
*/
@ -281,3 +151,193 @@ private object BackoffSupervisor {
final case class ResetRestartCount(current: Int) extends DeadLetterSuppression
}
private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff)
extends AbstractSupervisor[O, T, Thr](strategy) {
import RestartSupervisor._
private var restartingInProgress: OptionVal[(StashBuffer[Any], Set[ActorRef[Nothing]])] = OptionVal.None
private var restartCount: Int = 0
private var gotScheduledRestart = true
private var deadline: OptionVal[Deadline] = OptionVal.None
private def deadlineHasTimeLeft: Boolean = deadline match {
case OptionVal.None true
case OptionVal.Some(d) d.hasTimeLeft
}
override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
restartingInProgress match {
case OptionVal.None
super.aroundSignal(ctx, signal, target)
case OptionVal.Some((stashBuffer, children))
signal match {
case Terminated(ref) if strategy.stopChildren && children(ref)
val remainingChildren = children - ref
if (remainingChildren.isEmpty && gotScheduledRestart) {
restartCompleted(ctx)
} else {
restartingInProgress = OptionVal.Some((stashBuffer, remainingChildren))
Behaviors.same
}
case _
if (stashBuffer.isFull)
dropped(ctx, signal)
else
stashBuffer.stash(signal)
Behaviors.same
}
}
}
override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
msg.asInstanceOf[Any] match {
case ScheduledRestart
restartingInProgress match {
case OptionVal.Some((_, children))
if (strategy.stopChildren && children.nonEmpty) {
// still waiting for children to stop
gotScheduledRestart = true
Behaviors.same
} else
restartCompleted(ctx)
case OptionVal.None
throw new IllegalStateException("Unexpected ScheduledRestart when restart not in progress")
}
case ResetRestartCount(current)
if (current == restartCount) {
restartCount = 0
}
Behavior.same
case m: T @unchecked
restartingInProgress match {
case OptionVal.None
try {
target(ctx, m)
} catch handleReceiveException(ctx, target)
case OptionVal.Some((stashBuffer, _))
if (stashBuffer.isFull)
dropped(ctx, m)
else
stashBuffer.stash(m)
Behaviors.same
}
}
}
override protected def handleExceptionOnStart(ctx: TypedActorContext[O], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
strategy match {
case _: Restart
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
if (strategy.unlimitedRestarts() || ((restartCount + 1) >= strategy.maxRestarts && deadlineHasTimeLeft)) {
// don't log here as it'll be logged as ActorInitializationException
throw t
} else {
prepareRestart(ctx, t)
}
case _: Backoff
prepareRestart(ctx, t)
}
}
override protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, () target(ctx, PreRestart))
}
override protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, () target.signalRestart(ctx))
}
private def handleException(ctx: TypedActorContext[O], signalRestart: () Unit): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) {
strategy match {
case _: Restart throw t
case _: Backoff
log(ctx, t)
Behavior.failed(t)
}
} else {
try signalRestart() catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
prepareRestart(ctx, t)
}
}
private def prepareRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = {
log(ctx, reason)
val currentRestartCount = restartCount
updateRestartCount()
val childrenToStop = if (strategy.stopChildren) ctx.asScala.children.toSet else Set.empty[ActorRef[Nothing]]
stopChildren(ctx, childrenToStop)
val stashCapacity =
if (strategy.stashCapacity >= 0) strategy.stashCapacity
else ctx.asScala.system.settings.RestartStashCapacity
restartingInProgress = OptionVal.Some((StashBuffer[Any](stashCapacity), childrenToStop))
strategy match {
case backoff: Backoff
val restartDelay = calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor)
gotScheduledRestart = false
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart)
Behaviors.empty
case _: Restart
if (childrenToStop.isEmpty)
restartCompleted(ctx)
else
Behaviors.empty // wait for termination of children
}
}
private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = {
strategy match {
case backoff: Backoff
gotScheduledRestart = false
ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount))
case _: Restart
}
try {
val newBehavior = Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]]))
val nextBehavior = restartingInProgress match {
case OptionVal.None newBehavior
case OptionVal.Some((stashBuffer, _))
restartingInProgress = OptionVal.None
stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast)
}
nextBehavior.narrow
} catch handleException(ctx, signalRestart = () ())
// FIXME signal Restart is not done if unstashAll throws, unstash of each message may return a new behavior and
// it's the failing one that should receive the signal
}
private def stopChildren(ctx: TypedActorContext[_], children: Set[ActorRef[Nothing]]): Unit = {
children.foreach { child
ctx.asScala.watch(child)
ctx.asScala.stop(child)
}
}
private def updateRestartCount(): Unit = {
strategy match {
case restart: Restart
val timeLeft = deadlineHasTimeLeft
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + restart.withinTimeRange)
restartCount = if (timeLeft) restartCount + 1 else 1
deadline = newDeadline
case _: Backoff
restartCount += 1
}
}
}

View file

@ -1,7 +1,16 @@
# Fault Tolerance
When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor
will by default be stopped. Note that there is an important distinction between failures and validation errors:
will by default be stopped.
@@@ note
An important difference between Typed and Untyped actors is that Typed actors are by default stopped if
an exception is thrown and no supervision strategy is defined while in Untyped they are restarted.
@@@
Note that there is an important distinction between failures and validation errors:
A validation error means that the data of a command sent to an actor is not valid, this should rather be modelled as a
part of the actor protocol than make the actor throw exceptions.
@ -74,6 +83,33 @@ Java
Each returned behavior will be re-wrapped automatically with the supervisor.
## Child actors are stopped when parent is restarting
Child actors are often started in a `setup` block that is run again when the parent actor is restarted.
The child actors are stopped to avoid resource leaks of creating new child actors each time the parent is restarted.
Scala
: @@snip [SupervisionCompileOnly.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala) { #restart-stop-children }
Java
: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart-stop-children }
It is possible to override this so that child actors are not influenced when the parent actor is restarted.
The restarted parent instance will then have the same children as before the failure.
If child actors are created from `setup` like in the previous example and they should remain intact (not stopped)
when parent is restarted the `supervise` should be placed inside the `setup` and using
@scala[`SupervisorStrategy.restart.withStopChildren(false)`]@java[`SupervisorStrategy.restart().withStopChildren(false)`]
like this:
Scala
: @@snip [SupervisionCompileOnly.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala) { #restart-keep-children }
Java
: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart-keep-children }
That means that the `setup` block will only be run when the parent actor is first started, and not when it is
restarted.
## Bubble failures up through the hierarchy
@ -98,4 +134,4 @@ Scala
: @@snip [FaultToleranceDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala) { #bubbling-example }
Java
: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java) { #bubbling-example }
: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java) { #bubbling-example }

View file

@ -4,7 +4,7 @@
package akka.persistence.typed.javadsl;
import akka.actor.testkit.typed.TE;
import akka.actor.testkit.typed.TestException;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
@ -92,7 +92,7 @@ public class EventSourcedActorFailureTest extends JUnitSuite {
Behavior<String> p1 =
fail(new PersistenceId("fail-recovery-once"), probe.ref(), recoveryFailureProbe.ref());
testKit.spawn(p1);
recoveryFailureProbe.expectMessageClass(TE.class);
recoveryFailureProbe.expectMessageClass(TestException.class);
}
@Test

View file

@ -8,17 +8,17 @@ import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, SupervisorStrategy }
import akka.actor.testkit.typed.TE
import akka.persistence.AtomicWrite
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.EventRejectedException
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import akka.actor.testkit.typed.TestException
import akka.persistence.typed.PersistenceId
class ChaosJournal extends InmemJournal {
@ -30,11 +30,11 @@ class ChaosJournal extends InmemJournal {
val pid = messages.head.persistenceId
if (pid == "fail-first-2" && count < 2) {
count += 1
Future.failed(TE("database says no"))
Future.failed(TestException("database says no"))
} else if (pid == "reject-first" && reject) {
reject = false
Future.successful(messages.map(aw Try {
throw TE("I don't like it")
throw TestException("I don't like it")
}))
} else {
super.asyncWriteMessages(messages)
@ -44,9 +44,9 @@ class ChaosJournal extends InmemJournal {
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
if (persistenceId == "fail-recovery-once" && failRecovery) {
failRecovery = false
Future.failed(TE("Nah"))
Future.failed(TestException("Nah"))
} else if (persistenceId == "fail-recovery") {
Future.failed(TE("Nope"))
Future.failed(TestException("Nope"))
} else {
super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)
}
@ -93,13 +93,13 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
spawn(failingPersistentActor(PersistenceId("fail-recovery"))
.onRecoveryFailure(t probe.ref ! t))
probe.expectMessageType[TE].message shouldEqual "Nope"
probe.expectMessageType[TestException].message shouldEqual "Nope"
}
"handle exceptions in onRecoveryFailure" in {
val probe = TestProbe[String]()
val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref)
.onRecoveryFailure(t throw TE("recovery call back failure")))
.onRecoveryFailure(t throw TestException("recovery call back failure")))
pa ! "one"
probe.expectMessage("starting")
probe.expectMessage("persisting")

View file

@ -9,11 +9,11 @@ import java.util.UUID
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, SupervisorStrategy }
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.actor.testkit.typed.TE
import akka.actor.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
@ -85,7 +85,7 @@ object PerformanceSpec {
Effect.persist(evt).thenRun(_ {
parameters.persistCalls += 1
if (parameters.every(1000)) print(".")
if (parameters.shouldFail) throw TE("boom")
if (parameters.shouldFail) throw TestException("boom")
})
case _ Effect.none
}