Untyped/Typed supervision (#26653)

* Clarify supervision from untyped to typed
* Default to restart for untyped spawned from typed
* Default to stop for typed spawned from untyped
This commit is contained in:
Christopher Batey 2019-04-02 20:54:54 +01:00 committed by Patrik Nordwall
parent 9dae4050eb
commit 88091c1ac9
14 changed files with 420 additions and 132 deletions

View file

@ -5,6 +5,7 @@
package akka.actor.typed.javadsl;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
@ -54,9 +55,7 @@ public class AdapterTest extends JUnitSuite {
static Behavior<String> create(akka.actor.ActorRef ref, akka.actor.ActorRef probe) {
Typed1 logic = new Typed1(ref, probe);
return receive(
(context, message) -> logic.onMessage(context, message),
(context, sig) -> logic.onSignal(context, sig));
return receive(logic::onMessage, logic::onSignal);
}
Behavior<String> onMessage(ActorContext<String> context, String message) {
@ -74,7 +73,8 @@ public class AdapterTest extends JUnitSuite {
} else if (message.equals("watch")) {
Adapter.watch(context, ref);
return same();
} else if (message.equals("supervise-stop")) {
} else if (message.equals("supervise-restart")) {
// restart is the default, otherwise an intermediate is required
akka.actor.ActorRef child = Adapter.actorOf(context, untyped1());
Adapter.watch(context, child);
child.tell(new ThrowIt3(), Adapter.toUntyped(context.getSelf()));
@ -315,26 +315,7 @@ public class AdapterTest extends JUnitSuite {
}
@Test
public void shouldSuperviseTypedChildFromUntypedParent() {
TestKit probe = new TestKit(system);
ActorRef<Ping> ignore = Adapter.spawnAnonymous(system, ignore());
akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef()));
untypedRef.tell("supervise-stop", akka.actor.ActorRef.noSender());
probe.expectMsg("thrown-stop");
// ping => ok should not get through here
probe.expectMsg("terminated");
untypedRef.tell("supervise-resume", akka.actor.ActorRef.noSender());
probe.expectMsg("thrown-resume");
probe.expectMsg("ok");
untypedRef.tell("supervise-restart", akka.actor.ActorRef.noSender());
probe.expectMsg("thrown-restart");
probe.expectMsg("ok");
}
@Test
public void shouldSuperviseUntypedChildFromTypedParent() {
public void shouldSuperviseUntypedChildAsRestartFromTypedParent() {
TestKit probe = new TestKit(system);
akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty());
ActorRef<String> typedRef =
@ -345,9 +326,8 @@ public class AdapterTest extends JUnitSuite {
// suppress the logging with stack trace
system.getEventStream().setLogLevel(Integer.MIN_VALUE); // OFF
// only stop supervisorStrategy
typedRef.tell("supervise-stop");
probe.expectMsg("terminated");
typedRef.tell("supervise-restart");
probe.expectMsg("ok");
} finally {
system.getEventStream().setLogLevel(originalLogLevel);
}

View file

@ -295,7 +295,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
def behv =
supervise(setup[Command] { _ =>
probe.ref ! StartFailed
throw new TestException("construction failed")
throw TestException("construction failed")
}).onFailure[IllegalArgumentException](strategy)
}
@ -308,6 +308,16 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
probe.expectMessage(Pong(1))
}
"default to stop when no strategy" in {
val probe = TestProbe[Event]("evt")
val behv = targetBehavior(probe.ref)
val ref = spawn(behv)
EventFilter[Exc3](occurrences = 1).intercept {
ref ! Throw(new Exc3)
probe.expectMessage(ReceivedSignal(PostStop))
probe.expectTerminated(ref)
}
}
"stop when strategy is stop" in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(probe.ref)).onFailure[Throwable](SupervisorStrategy.stop)
@ -315,6 +325,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
EventFilter[Exc3](occurrences = 1).intercept {
ref ! Throw(new Exc3)
probe.expectMessage(ReceivedSignal(PostStop))
probe.expectTerminated(ref)
}
}

View file

@ -19,7 +19,9 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object WatchSpec {
val config = ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")
val config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
""".stripMargin)
case object Stop
@ -116,7 +118,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe
"notify a parent of child termination because of failure with a supervisor" in {
val probe = TestProbe[Any]()
val ex = new TestException("boom")
val ex = TestException("boom")
val behavior = Behaviors.setup[Any] { context =>
val child = context.spawn(
Behaviors

View file

@ -0,0 +1,65 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.coexistence
import akka.actor.Actor
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.{ WordSpec, WordSpecLike }
import akka.actor.typed.scaladsl.adapter._
import akka.{ actor => untyped }
import scala.concurrent.duration._
object TypedSupervisingUntypedSpec {
sealed trait Protocol
final case class SpawnUntypedActor(props: untyped.Props, replyTo: ActorRef[SpawnedUntypedActor]) extends Protocol
final case class SpawnedUntypedActor(ref: untyped.ActorRef)
def untypedActorOf() = Behaviors.receive[Protocol] {
case (ctx, SpawnUntypedActor(props, replyTo)) =>
replyTo ! SpawnedUntypedActor(ctx.actorOf(props))
Behaviors.same
}
class UntypedActor(lifecycleProbe: ActorRef[String]) extends Actor {
override def receive: Receive = {
case "throw" => throw TestException("oh dear")
}
override def postStop(): Unit = {
lifecycleProbe ! "postStop"
}
override def preStart(): Unit = {
lifecycleProbe ! "preStart"
}
}
}
class TypedSupervisingUntypedSpec extends ScalaTestWithActorTestKit("""
akka.loglevel = INFO
""".stripMargin) with WordSpecLike {
import TypedSupervisingUntypedSpec._
"Typed supervising untyped" should {
"default to restart" in {
val ref: ActorRef[Protocol] = spawn(untypedActorOf())
val lifecycleProbe = TestProbe[String]
val probe = TestProbe[SpawnedUntypedActor]
ref ! SpawnUntypedActor(untyped.Props(new UntypedActor(lifecycleProbe.ref)), probe.ref)
val spawnedUntyped = probe.expectMessageType[SpawnedUntypedActor].ref
lifecycleProbe.expectMessage("preStart")
spawnedUntyped ! "throw"
lifecycleProbe.expectMessage("postStop")
// should be restarted because it is an untyped actor
lifecycleProbe.expectMessage("preStart")
}
}
}

View file

@ -0,0 +1,130 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.coexistence
import akka.actor.Actor
import akka.actor.testkit.typed.TestException
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed._
import akka.actor.typed.coexistence.UntypedSupervisingTypedSpec.{
SpawnAnonFromUntyped,
SpawnFromUntyped,
TypedSpawnedFromUntypedConext,
UntypedToTyped
}
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
import akka.actor.typed.scaladsl.adapter._
import akka.{ actor => u }
import scala.concurrent.duration._
object ProbedBehavior {
def behavior(probe: u.ActorRef): Behavior[String] = {
Behaviors
.receiveMessage[String] {
case "throw" => throw TestException("oh dear")
}
.receiveSignal {
case (_, s) =>
probe ! s
Behaviors.same
}
}
}
object UntypedSupervisingTypedSpec {
case class SpawnFromUntyped(behav: Behavior[String], name: String)
case class SpawnAnonFromUntyped(behav: Behavior[String])
case class TypedSpawnedFromUntypedConext(actorRef: ActorRef[String])
class UntypedToTyped extends Actor {
override def receive: Receive = {
case SpawnFromUntyped(behav, name) =>
sender() ! TypedSpawnedFromUntypedConext(context.spawn(behav, name))
case SpawnAnonFromUntyped(behav) =>
sender() ! TypedSpawnedFromUntypedConext(context.spawnAnonymous(behav))
}
}
}
class UntypedSupervisingTypedSpec extends AkkaSpec with ImplicitSender {
implicit val typedActorSystem: ActorSystem[Nothing] = system.toTyped
val smallDuration = 50.millis
"An untyped actor system that spawns typed actors" should {
"default to stop for supervision" in {
val probe = TestProbe()
val underTest = system.spawn(ProbedBehavior.behavior(probe.ref), "a1")
watch(underTest.toUntyped)
underTest ! "throw"
probe.expectMsg(PostStop)
probe.expectNoMessage(smallDuration)
expectTerminated(underTest.toUntyped)
}
"default to stop for supervision for spawn anonymous" in {
val probe = TestProbe()
val underTest = system.spawnAnonymous(ProbedBehavior.behavior(probe.ref))
watch(underTest.toUntyped)
underTest ! "throw"
probe.expectMsg(PostStop)
probe.expectNoMessage(smallDuration)
expectTerminated(underTest.toUntyped)
}
"allows overriding the default" in {
val probe = TestProbe()
val value = Behaviors.supervise(ProbedBehavior.behavior(probe.ref)).onFailure(SupervisorStrategy.restart)
val underTest = system.spawn(value, "a2")
watch(underTest.toUntyped)
underTest ! "throw"
probe.expectMsg(PreRestart)
probe.expectNoMessage(smallDuration)
expectNoMessage(smallDuration)
}
"default to stop supervision (from context)" in {
val untyped = system.actorOf(u.Props(new UntypedToTyped()))
val probe = TestProbe()
untyped ! SpawnFromUntyped(ProbedBehavior.behavior(probe.ref), "a3")
val underTest = expectMsgType[TypedSpawnedFromUntypedConext].actorRef
watch(underTest.toUntyped)
underTest ! "throw"
probe.expectMsg(PostStop)
probe.expectNoMessage(smallDuration)
expectTerminated(underTest.toUntyped)
}
"allow overriding the default (from context)" in {
val untyped = system.actorOf(u.Props(new UntypedToTyped()))
val probe = TestProbe()
val behavior = Behaviors.supervise(ProbedBehavior.behavior(probe.ref)).onFailure(SupervisorStrategy.restart)
untyped ! SpawnFromUntyped(behavior, "a4")
val underTest = expectMsgType[TypedSpawnedFromUntypedConext].actorRef
watch(underTest.toUntyped)
underTest ! "throw"
probe.expectMsg(PreRestart)
probe.expectNoMessage(smallDuration)
expectNoMessage(smallDuration)
}
"default to stop supervision for spawn anonymous (from context)" in {
val untyped = system.actorOf(u.Props(new UntypedToTyped()))
val probe = TestProbe()
untyped ! SpawnAnonFromUntyped(ProbedBehavior.behavior(probe.ref))
val underTest = expectMsgType[TypedSpawnedFromUntypedConext].actorRef
watch(underTest.toUntyped)
underTest ! "throw"
probe.expectMsg(PostStop)
probe.expectNoMessage(smallDuration)
expectTerminated(underTest.toUntyped)
}
}
}

View file

@ -55,7 +55,8 @@ object AdapterSpec {
case "watch" =>
context.watch(ref)
Behaviors.same
case "supervise-stop" =>
case "supervise-restart" =>
// restart is the default
val child = context.actorOf(untyped1)
context.watch(child)
child ! ThrowIt3
@ -271,31 +272,6 @@ class AdapterSpec extends AkkaSpec("""
probe.expectMsg("terminated")
}
"supervise typed child from untyped parent" in {
val probe = TestProbe()
val ign = system.spawnAnonymous(Behaviors.ignore[Ping])
val untypedRef = system.actorOf(untyped2(ign, probe.ref))
EventFilter[AdapterSpec.ThrowIt1.type](occurrences = 1).intercept {
EventFilter.warning(pattern = """.*received dead letter.*""", occurrences = 1).intercept {
untypedRef ! "supervise-stop"
probe.expectMsg("thrown-stop")
// ping => ok should not get through here
probe.expectMsg("terminated")
}
}
untypedRef ! "supervise-resume"
probe.expectMsg("thrown-resume")
probe.expectMsg("ok")
EventFilter[AdapterSpec.ThrowIt3.type](occurrences = 1).intercept {
untypedRef ! "supervise-restart"
probe.expectMsg("thrown-restart")
probe.expectMsg("ok")
}
}
"supervise untyped child from typed parent" in {
// FIXME there's a warning with null logged from the untyped empty child here, where does that come from?
val probe = TestProbe()
@ -304,11 +280,8 @@ class AdapterSpec extends AkkaSpec("""
// only stop supervisorStrategy
EventFilter[AdapterSpec.ThrowIt3.type](occurrences = 1).intercept {
EventFilter.warning(pattern = """.*received dead letter.*""", occurrences = 1).intercept {
typedRef ! "supervise-stop"
probe.expectMsg("terminated")
probe.expectNoMessage(100.millis) // no pong
}
typedRef ! "supervise-restart"
probe.expectMsg("ok")
}
}

View file

@ -6,7 +6,8 @@ package docs.akka.typed.coexistence
import akka.actor.typed._
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.TestKit
import akka.testkit.{ AkkaSpec, TestKit }
import docs.akka.typed.coexistence.TypedWatchingUntypedSpec.Typed
//#adapter-import
// adds support for typed actors to an untyped actor system and context
import akka.actor.typed.scaladsl.adapter._
@ -17,6 +18,7 @@ import akka.{ actor => untyped }
//#import-alias
import org.scalatest.WordSpec
import scala.concurrent.duration._
import TypedWatchingUntypedSpec.Typed._
object TypedWatchingUntypedSpec {

View file

@ -8,7 +8,7 @@ package adapter
import java.lang.reflect.InvocationTargetException
import akka.actor.ActorInitializationException
import akka.actor.{ ActorInitializationException, ActorRefWithCell }
import akka.{ actor => untyped }
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.Behavior.StoppedBehavior
@ -34,6 +34,8 @@ import akka.util.OptionVal
* the cause and can fill in the cause in the `ChildFailed` signal
* Wrapped to avoid it being logged as the typed supervision will already
* have logged it.
*
* Should only be thrown if the parent is known to be an `ActorAdapter`.
*/
final case class TypedActorFailedException(cause: Throwable) extends RuntimeException
@ -46,7 +48,7 @@ import akka.util.OptionVal
/**
* INTERNAL API
*/
@InternalApi private[typed] final class ActorAdapter[T](_initialBehavior: Behavior[T])
@InternalApi private[typed] final class ActorAdapter[T](_initialBehavior: Behavior[T], rethrowTypedFailure: Boolean)
extends untyped.Actor
with untyped.ActorLogging {
import Behavior._
@ -146,7 +148,8 @@ import akka.util.OptionVal
case BehaviorTags.FailedBehavior =>
val f = b.asInstanceOf[FailedBehavior]
// For the parent untyped supervisor to pick up the exception
throw TypedActorFailedException(f.cause)
if (rethrowTypedFailure) throw TypedActorFailedException(f.cause)
else context.stop(self)
case BehaviorTags.StoppedBehavior =>
val stopped = b.asInstanceOf[StoppedBehavior[T]]
behavior = new ComposedStoppingBehavior[T](behavior, stopped)
@ -200,6 +203,12 @@ import akka.util.OptionVal
recordChildFailure(cause)
untyped.SupervisorStrategy.Stop
case ex =>
val isTypedActor = sender() match {
case afwc: ActorRefWithCell =>
afwc.underlying.props.producer.actorClass == classOf[ActorAdapter[_]]
case _ =>
false
}
recordChildFailure(ex)
val logMessage = ex match {
case e: ActorInitializationException if e.getCause ne null =>
@ -211,7 +220,10 @@ import akka.util.OptionVal
}
// log at Error as that is what the supervision strategy would have done.
log.error(ex, logMessage)
untyped.SupervisorStrategy.Stop
if (isTypedActor)
untyped.SupervisorStrategy.Stop
else
untyped.SupervisorStrategy.Restart
}
private def recordChildFailure(ex: Throwable): Unit = {

View file

@ -10,11 +10,42 @@ import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.event.LoggingFilterWithMarker
import akka.util.OptionVal
import akka.{ ConfigurationException, actor => untyped }
import akka.{ actor => untyped }
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
@InternalApi
private[akka] object ActorContextAdapter {
private def toUntypedImp[U](context: TypedActorContext[_]): untyped.ActorContext =
context match {
case adapter: ActorContextAdapter[_] => adapter.untypedContext
case _ =>
throw new UnsupportedOperationException(
"only adapted untyped ActorContext permissible " +
s"($context of class ${context.getClass.getName})")
}
def toUntyped[U](context: scaladsl.ActorContext[_]): untyped.ActorContext =
context match {
case c: TypedActorContext[_] => toUntypedImp(c)
case _ =>
throw new UnsupportedOperationException(
"unknown ActorContext type " +
s"($context of class ${context.getClass.getName})")
}
def toUntyped[U](context: javadsl.ActorContext[_]): untyped.ActorContext =
context match {
case c: TypedActorContext[_] => toUntypedImp(c)
case _ =>
throw new UnsupportedOperationException(
"unknown ActorContext type " +
s"($context of class ${context.getClass.getName})")
}
}
/**
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[TypedActorContext]].
*/
@ -35,9 +66,9 @@ import scala.concurrent.duration._
override def children: Iterable[ActorRef[Nothing]] = untypedContext.children.map(ActorRefAdapter(_))
override def child(name: String): Option[ActorRef[Nothing]] = untypedContext.child(name).map(ActorRefAdapter(_))
override def spawnAnonymous[U](behavior: Behavior[U], props: Props = Props.empty): ActorRef[U] =
ActorContextAdapter.spawnAnonymous(untypedContext, behavior, props)
ActorRefFactoryAdapter.spawnAnonymous(untypedContext, behavior, props, rethrowTypedFailure = true)
override def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U] =
ActorContextAdapter.spawn(untypedContext, behavior, name, props)
ActorRefFactoryAdapter.spawn(untypedContext, behavior, name, props, rethrowTypedFailure = true)
override def stop[U](child: ActorRef[U]): Unit =
if (child.path.parent == self.path) { // only if a direct child
toUntyped(child) match {
@ -117,59 +148,3 @@ import scala.concurrent.duration._
*/
private[akka] override def onUnhandled(msg: T): Unit = adapter.unhandled(msg)
}
/**
* INTERNAL API
*/
@InternalApi private[typed] object ActorContextAdapter {
private def toUntypedImp[U](context: TypedActorContext[_]): untyped.ActorContext =
context match {
case adapter: ActorContextAdapter[_] => adapter.untypedContext
case _ =>
throw new UnsupportedOperationException(
"only adapted untyped ActorContext permissible " +
s"($context of class ${context.getClass.getName})")
}
def toUntyped2[U](context: TypedActorContext[_]): untyped.ActorContext = toUntypedImp(context)
def toUntyped[U](context: scaladsl.ActorContext[_]): untyped.ActorContext =
context match {
case c: TypedActorContext[_] => toUntypedImp(c)
case _ =>
throw new UnsupportedOperationException(
"unknown ActorContext type " +
s"($context of class ${context.getClass.getName})")
}
def toUntyped[U](context: javadsl.ActorContext[_]): untyped.ActorContext =
context match {
case c: TypedActorContext[_] => toUntypedImp(c)
case _ =>
throw new UnsupportedOperationException(
"unknown ActorContext type " +
s"($context of class ${context.getClass.getName})")
}
def spawnAnonymous[T](context: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = {
try {
Behavior.validateAsInitial(behavior)
ActorRefAdapter(context.actorOf(PropsAdapter(() => behavior, props)))
} catch {
case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") =>
throw new ConfigurationException("Remote deployment not allowed for typed actors", ex)
}
}
def spawn[T](context: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = {
try {
Behavior.validateAsInitial(behavior)
ActorRefAdapter(context.actorOf(PropsAdapter(() => behavior, props), name))
} catch {
case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") =>
throw new ConfigurationException("Remote deployment not allowed for typed actors", ex)
}
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.adapter
import akka.actor.typed._
import akka.annotation.InternalApi
import akka.ConfigurationException
/**
* INTERNAL API
*/
@InternalApi private[typed] object ActorRefFactoryAdapter {
def spawnAnonymous[T](
context: akka.actor.ActorRefFactory,
behavior: Behavior[T],
props: Props,
rethrowTypedFailure: Boolean): ActorRef[T] = {
try {
ActorRefAdapter(context.actorOf(internal.adapter.PropsAdapter(() => behavior, props, rethrowTypedFailure)))
} catch {
case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") =>
throw new ConfigurationException("Remote deployment not allowed for typed actors", ex)
}
}
def spawn[T](
actorRefFactory: akka.actor.ActorRefFactory,
behavior: Behavior[T],
name: String,
props: Props,
rethrowTypedFailure: Boolean): ActorRef[T] = {
try {
ActorRefAdapter(
actorRefFactory.actorOf(
internal.adapter.PropsAdapter(() => Behavior.validateAsInitial(behavior), props, rethrowTypedFailure),
name))
} catch {
case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") =>
throw new ConfigurationException("Remote deployment not allowed for typed actors", ex)
}
}
}

View file

@ -13,8 +13,11 @@ import akka.annotation.InternalApi
* INTERNAL API
*/
@InternalApi private[akka] object PropsAdapter {
def apply[T](behavior: () => Behavior[T], deploy: Props = Props.empty): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior()))
def apply[T](
behavior: () => Behavior[T],
deploy: Props = Props.empty,
rethrowTypedFailure: Boolean = true): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure))
(deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match {
case _: DispatcherDefault => props

View file

@ -30,27 +30,67 @@ import akka.japi.Creator
*/
object Adapter {
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T]): ActorRef[T] =
spawnAnonymous(sys, behavior, EmptyProps)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], props: Props): ActorRef[T] =
sys.spawnAnonymous(behavior, props)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String): ActorRef[T] =
spawn(sys, behavior, name, EmptyProps)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
sys.spawn(behavior, name, props)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T]): ActorRef[T] =
spawnAnonymous(ctx, behavior, EmptyProps)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] =
ctx.spawnAnonymous(behavior, props)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String): ActorRef[T] =
spawn(ctx, behavior, name, EmptyProps)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
* Typed actors default supervision strategy is to stop. Can be overridden with
* Behaviors.supervise.
*/
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
ctx.spawn(behavior, name, props)

View file

@ -38,12 +38,33 @@ package object adapter {
*/
implicit class UntypedActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal {
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
*
* Typed actors default supervision strategy is to stop. Can be overridden with
* `Behaviors.supervise`.
*/
def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] = {
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props)))
ActorRefFactoryAdapter.spawnAnonymous(
sys,
Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop),
props,
rethrowTypedFailure = false)
}
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
*
* Typed actors default supervision strategy is to stop. Can be overridden with
* `Behaviors.supervise`.
*/
def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] = {
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props), name))
ActorRefFactoryAdapter.spawn(
sys,
Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop),
name,
props,
rethrowTypedFailure = false)
}
def toTyped: ActorSystem[Nothing] = AdapterExtension(sys).adapter
@ -70,10 +91,33 @@ package object adapter {
* Extension methods added to [[akka.actor.ActorContext]].
*/
implicit class UntypedActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal {
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
*
* Typed actors default supervision strategy is to stop. Can be overridden with
* `Behaviors.supervise`.
*/
def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] =
ActorContextAdapter.spawnAnonymous(ctx, behavior, props)
ActorRefFactoryAdapter.spawnAnonymous(
ctx,
Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop),
props,
rethrowTypedFailure = false)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
*
* Typed actors default supervision strategy is to stop. Can be overridden with
* `Behaviors.supervise`.
*/
def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] =
ActorContextAdapter.spawn(ctx, behavior, name, props)
ActorRefFactoryAdapter.spawn(
ctx,
Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop),
name,
props,
rethrowTypedFailure = false)
def watch[U](other: ActorRef[U]): Unit = ctx.watch(ActorRefAdapter.toUntyped(other))
def unwatch[U](other: ActorRef[U]): Unit = ctx.unwatch(ActorRefAdapter.toUntyped(other))
@ -88,6 +132,7 @@ package object adapter {
implicit class TypedActorContextOps(val ctx: scaladsl.ActorContext[_]) extends AnyVal {
def actorOf(props: akka.actor.Props): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props)
def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props, name)

View file

@ -115,6 +115,12 @@ Scala
Java
: @@snip [TypedWatchingUntypedTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/coexistence/TypedWatchingUntypedTest.java) { #typed }
There is one caveat regarding supervision of untyped child from typed parent. If the child throws an exception we would expect it to be restarted, but supervision in Akka Typed defaults to stopping the child in case it fails. The restarting facilities in Akka Typed will not work with untyped children. However, the workaround is to add another untyped actor that takes care of the supervision, i.e. restarts in case of failure if that is the desired behavior.
## Supervision
The default supervision for untyped actors is to restart where as for typed it is to stop.
When combining untyped and and typed actors the default supervision is based on the default behavior of
the child i.e. if an untyped actor creates a typed child, its default supervision will be to stop. If a typed
actor creates an untyped child, its default supervision will be to restart.