diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/AdapterTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/AdapterTest.java index 878ca68c54..141469927a 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/AdapterTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/AdapterTest.java @@ -5,6 +5,7 @@ package akka.actor.typed.javadsl; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.scalatest.junit.JUnitSuite; @@ -54,9 +55,7 @@ public class AdapterTest extends JUnitSuite { static Behavior create(akka.actor.ActorRef ref, akka.actor.ActorRef probe) { Typed1 logic = new Typed1(ref, probe); - return receive( - (context, message) -> logic.onMessage(context, message), - (context, sig) -> logic.onSignal(context, sig)); + return receive(logic::onMessage, logic::onSignal); } Behavior onMessage(ActorContext context, String message) { @@ -74,7 +73,8 @@ public class AdapterTest extends JUnitSuite { } else if (message.equals("watch")) { Adapter.watch(context, ref); return same(); - } else if (message.equals("supervise-stop")) { + } else if (message.equals("supervise-restart")) { + // restart is the default, otherwise an intermediate is required akka.actor.ActorRef child = Adapter.actorOf(context, untyped1()); Adapter.watch(context, child); child.tell(new ThrowIt3(), Adapter.toUntyped(context.getSelf())); @@ -315,26 +315,7 @@ public class AdapterTest extends JUnitSuite { } @Test - public void shouldSuperviseTypedChildFromUntypedParent() { - TestKit probe = new TestKit(system); - ActorRef ignore = Adapter.spawnAnonymous(system, ignore()); - akka.actor.ActorRef untypedRef = system.actorOf(untyped2(ignore, probe.getRef())); - untypedRef.tell("supervise-stop", akka.actor.ActorRef.noSender()); - probe.expectMsg("thrown-stop"); - // ping => ok should not get through here - probe.expectMsg("terminated"); - - untypedRef.tell("supervise-resume", akka.actor.ActorRef.noSender()); - probe.expectMsg("thrown-resume"); - probe.expectMsg("ok"); - - untypedRef.tell("supervise-restart", akka.actor.ActorRef.noSender()); - probe.expectMsg("thrown-restart"); - probe.expectMsg("ok"); - } - - @Test - public void shouldSuperviseUntypedChildFromTypedParent() { + public void shouldSuperviseUntypedChildAsRestartFromTypedParent() { TestKit probe = new TestKit(system); akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty()); ActorRef typedRef = @@ -345,9 +326,8 @@ public class AdapterTest extends JUnitSuite { // suppress the logging with stack trace system.getEventStream().setLogLevel(Integer.MIN_VALUE); // OFF - // only stop supervisorStrategy - typedRef.tell("supervise-stop"); - probe.expectMsg("terminated"); + typedRef.tell("supervise-restart"); + probe.expectMsg("ok"); } finally { system.getEventStream().setLogLevel(originalLogLevel); } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index d7a943e75e..4fbaf01c7e 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -295,7 +295,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" def behv = supervise(setup[Command] { _ => probe.ref ! StartFailed - throw new TestException("construction failed") + throw TestException("construction failed") }).onFailure[IllegalArgumentException](strategy) } @@ -308,6 +308,16 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage(Pong(1)) } + "default to stop when no strategy" in { + val probe = TestProbe[Event]("evt") + val behv = targetBehavior(probe.ref) + val ref = spawn(behv) + EventFilter[Exc3](occurrences = 1).intercept { + ref ! Throw(new Exc3) + probe.expectMessage(ReceivedSignal(PostStop)) + probe.expectTerminated(ref) + } + } "stop when strategy is stop" in { val probe = TestProbe[Event]("evt") val behv = Behaviors.supervise(targetBehavior(probe.ref)).onFailure[Throwable](SupervisorStrategy.stop) @@ -315,6 +325,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" EventFilter[Exc3](occurrences = 1).intercept { ref ! Throw(new Exc3) probe.expectMessage(ReceivedSignal(PostStop)) + probe.expectTerminated(ref) } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala index bebc17919c..66e97d04b3 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala @@ -19,7 +19,9 @@ import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike object WatchSpec { - val config = ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""") + val config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + """.stripMargin) case object Stop @@ -116,7 +118,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe "notify a parent of child termination because of failure with a supervisor" in { val probe = TestProbe[Any]() - val ex = new TestException("boom") + val ex = TestException("boom") val behavior = Behaviors.setup[Any] { context => val child = context.spawn( Behaviors diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/coexistence/TypedSupervisingUntypedSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/coexistence/TypedSupervisingUntypedSpec.scala new file mode 100644 index 0000000000..f5e544f2f1 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/coexistence/TypedSupervisingUntypedSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.coexistence +import akka.actor.Actor +import akka.actor.testkit.typed.TestException +import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.Behaviors +import org.scalatest.{ WordSpec, WordSpecLike } +import akka.actor.typed.scaladsl.adapter._ +import akka.{ actor => untyped } + +import scala.concurrent.duration._ + +object TypedSupervisingUntypedSpec { + + sealed trait Protocol + final case class SpawnUntypedActor(props: untyped.Props, replyTo: ActorRef[SpawnedUntypedActor]) extends Protocol + final case class SpawnedUntypedActor(ref: untyped.ActorRef) + + def untypedActorOf() = Behaviors.receive[Protocol] { + case (ctx, SpawnUntypedActor(props, replyTo)) => + replyTo ! SpawnedUntypedActor(ctx.actorOf(props)) + Behaviors.same + } + + class UntypedActor(lifecycleProbe: ActorRef[String]) extends Actor { + override def receive: Receive = { + case "throw" => throw TestException("oh dear") + } + + override def postStop(): Unit = { + lifecycleProbe ! "postStop" + } + + override def preStart(): Unit = { + lifecycleProbe ! "preStart" + } + } + +} + +class TypedSupervisingUntypedSpec extends ScalaTestWithActorTestKit(""" + akka.loglevel = INFO + """.stripMargin) with WordSpecLike { + import TypedSupervisingUntypedSpec._ + + "Typed supervising untyped" should { + "default to restart" in { + val ref: ActorRef[Protocol] = spawn(untypedActorOf()) + val lifecycleProbe = TestProbe[String] + val probe = TestProbe[SpawnedUntypedActor] + ref ! SpawnUntypedActor(untyped.Props(new UntypedActor(lifecycleProbe.ref)), probe.ref) + val spawnedUntyped = probe.expectMessageType[SpawnedUntypedActor].ref + lifecycleProbe.expectMessage("preStart") + spawnedUntyped ! "throw" + lifecycleProbe.expectMessage("postStop") + // should be restarted because it is an untyped actor + lifecycleProbe.expectMessage("preStart") + } + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/coexistence/UntypedSupervisingTypedSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/coexistence/UntypedSupervisingTypedSpec.scala new file mode 100644 index 0000000000..0ae06785f6 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/coexistence/UntypedSupervisingTypedSpec.scala @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.coexistence +import akka.actor.Actor +import akka.actor.testkit.typed.TestException +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed._ +import akka.actor.typed.coexistence.UntypedSupervisingTypedSpec.{ + SpawnAnonFromUntyped, + SpawnFromUntyped, + TypedSpawnedFromUntypedConext, + UntypedToTyped +} +import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } +import akka.actor.typed.scaladsl.adapter._ +import akka.{ actor => u } + +import scala.concurrent.duration._ + +object ProbedBehavior { + def behavior(probe: u.ActorRef): Behavior[String] = { + Behaviors + .receiveMessage[String] { + case "throw" => throw TestException("oh dear") + } + .receiveSignal { + case (_, s) => + probe ! s + Behaviors.same + } + + } +} + +object UntypedSupervisingTypedSpec { + + case class SpawnFromUntyped(behav: Behavior[String], name: String) + case class SpawnAnonFromUntyped(behav: Behavior[String]) + case class TypedSpawnedFromUntypedConext(actorRef: ActorRef[String]) + + class UntypedToTyped extends Actor { + + override def receive: Receive = { + case SpawnFromUntyped(behav, name) => + sender() ! TypedSpawnedFromUntypedConext(context.spawn(behav, name)) + case SpawnAnonFromUntyped(behav) => + sender() ! TypedSpawnedFromUntypedConext(context.spawnAnonymous(behav)) + + } + } +} + +class UntypedSupervisingTypedSpec extends AkkaSpec with ImplicitSender { + + implicit val typedActorSystem: ActorSystem[Nothing] = system.toTyped + val smallDuration = 50.millis + + "An untyped actor system that spawns typed actors" should { + "default to stop for supervision" in { + val probe = TestProbe() + val underTest = system.spawn(ProbedBehavior.behavior(probe.ref), "a1") + watch(underTest.toUntyped) + underTest ! "throw" + probe.expectMsg(PostStop) + probe.expectNoMessage(smallDuration) + expectTerminated(underTest.toUntyped) + } + + "default to stop for supervision for spawn anonymous" in { + val probe = TestProbe() + val underTest = system.spawnAnonymous(ProbedBehavior.behavior(probe.ref)) + watch(underTest.toUntyped) + underTest ! "throw" + probe.expectMsg(PostStop) + probe.expectNoMessage(smallDuration) + expectTerminated(underTest.toUntyped) + } + + "allows overriding the default" in { + val probe = TestProbe() + val value = Behaviors.supervise(ProbedBehavior.behavior(probe.ref)).onFailure(SupervisorStrategy.restart) + val underTest = system.spawn(value, "a2") + watch(underTest.toUntyped) + underTest ! "throw" + probe.expectMsg(PreRestart) + probe.expectNoMessage(smallDuration) + expectNoMessage(smallDuration) + } + + "default to stop supervision (from context)" in { + val untyped = system.actorOf(u.Props(new UntypedToTyped())) + val probe = TestProbe() + untyped ! SpawnFromUntyped(ProbedBehavior.behavior(probe.ref), "a3") + val underTest = expectMsgType[TypedSpawnedFromUntypedConext].actorRef + watch(underTest.toUntyped) + underTest ! "throw" + probe.expectMsg(PostStop) + probe.expectNoMessage(smallDuration) + expectTerminated(underTest.toUntyped) + } + + "allow overriding the default (from context)" in { + val untyped = system.actorOf(u.Props(new UntypedToTyped())) + val probe = TestProbe() + val behavior = Behaviors.supervise(ProbedBehavior.behavior(probe.ref)).onFailure(SupervisorStrategy.restart) + untyped ! SpawnFromUntyped(behavior, "a4") + val underTest = expectMsgType[TypedSpawnedFromUntypedConext].actorRef + watch(underTest.toUntyped) + underTest ! "throw" + probe.expectMsg(PreRestart) + probe.expectNoMessage(smallDuration) + expectNoMessage(smallDuration) + } + + "default to stop supervision for spawn anonymous (from context)" in { + val untyped = system.actorOf(u.Props(new UntypedToTyped())) + val probe = TestProbe() + untyped ! SpawnAnonFromUntyped(ProbedBehavior.behavior(probe.ref)) + val underTest = expectMsgType[TypedSpawnedFromUntypedConext].actorRef + watch(underTest.toUntyped) + underTest ! "throw" + probe.expectMsg(PostStop) + probe.expectNoMessage(smallDuration) + expectTerminated(underTest.toUntyped) + } + + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala index 9d992fa383..7da46d4e7a 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala @@ -55,7 +55,8 @@ object AdapterSpec { case "watch" => context.watch(ref) Behaviors.same - case "supervise-stop" => + case "supervise-restart" => + // restart is the default val child = context.actorOf(untyped1) context.watch(child) child ! ThrowIt3 @@ -271,31 +272,6 @@ class AdapterSpec extends AkkaSpec(""" probe.expectMsg("terminated") } - "supervise typed child from untyped parent" in { - val probe = TestProbe() - val ign = system.spawnAnonymous(Behaviors.ignore[Ping]) - val untypedRef = system.actorOf(untyped2(ign, probe.ref)) - - EventFilter[AdapterSpec.ThrowIt1.type](occurrences = 1).intercept { - EventFilter.warning(pattern = """.*received dead letter.*""", occurrences = 1).intercept { - untypedRef ! "supervise-stop" - probe.expectMsg("thrown-stop") - // ping => ok should not get through here - probe.expectMsg("terminated") - } - } - - untypedRef ! "supervise-resume" - probe.expectMsg("thrown-resume") - probe.expectMsg("ok") - - EventFilter[AdapterSpec.ThrowIt3.type](occurrences = 1).intercept { - untypedRef ! "supervise-restart" - probe.expectMsg("thrown-restart") - probe.expectMsg("ok") - } - } - "supervise untyped child from typed parent" in { // FIXME there's a warning with null logged from the untyped empty child here, where does that come from? val probe = TestProbe() @@ -304,11 +280,8 @@ class AdapterSpec extends AkkaSpec(""" // only stop supervisorStrategy EventFilter[AdapterSpec.ThrowIt3.type](occurrences = 1).intercept { - EventFilter.warning(pattern = """.*received dead letter.*""", occurrences = 1).intercept { - typedRef ! "supervise-stop" - probe.expectMsg("terminated") - probe.expectNoMessage(100.millis) // no pong - } + typedRef ! "supervise-restart" + probe.expectMsg("ok") } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/coexistence/TypedWatchingUntypedSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/coexistence/TypedWatchingUntypedSpec.scala index 9e7c2a8a7d..6ec5fcffdb 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/coexistence/TypedWatchingUntypedSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/coexistence/TypedWatchingUntypedSpec.scala @@ -6,7 +6,8 @@ package docs.akka.typed.coexistence import akka.actor.typed._ import akka.actor.typed.scaladsl.Behaviors -import akka.testkit.TestKit +import akka.testkit.{ AkkaSpec, TestKit } +import docs.akka.typed.coexistence.TypedWatchingUntypedSpec.Typed //#adapter-import // adds support for typed actors to an untyped actor system and context import akka.actor.typed.scaladsl.adapter._ @@ -17,6 +18,7 @@ import akka.{ actor => untyped } //#import-alias import org.scalatest.WordSpec import scala.concurrent.duration._ +import TypedWatchingUntypedSpec.Typed._ object TypedWatchingUntypedSpec { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 234f8bf865..7353c57638 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -8,7 +8,7 @@ package adapter import java.lang.reflect.InvocationTargetException -import akka.actor.ActorInitializationException +import akka.actor.{ ActorInitializationException, ActorRefWithCell } import akka.{ actor => untyped } import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.StoppedBehavior @@ -34,6 +34,8 @@ import akka.util.OptionVal * the cause and can fill in the cause in the `ChildFailed` signal * Wrapped to avoid it being logged as the typed supervision will already * have logged it. + * + * Should only be thrown if the parent is known to be an `ActorAdapter`. */ final case class TypedActorFailedException(cause: Throwable) extends RuntimeException @@ -46,7 +48,7 @@ import akka.util.OptionVal /** * INTERNAL API */ -@InternalApi private[typed] final class ActorAdapter[T](_initialBehavior: Behavior[T]) +@InternalApi private[typed] final class ActorAdapter[T](_initialBehavior: Behavior[T], rethrowTypedFailure: Boolean) extends untyped.Actor with untyped.ActorLogging { import Behavior._ @@ -146,7 +148,8 @@ import akka.util.OptionVal case BehaviorTags.FailedBehavior => val f = b.asInstanceOf[FailedBehavior] // For the parent untyped supervisor to pick up the exception - throw TypedActorFailedException(f.cause) + if (rethrowTypedFailure) throw TypedActorFailedException(f.cause) + else context.stop(self) case BehaviorTags.StoppedBehavior => val stopped = b.asInstanceOf[StoppedBehavior[T]] behavior = new ComposedStoppingBehavior[T](behavior, stopped) @@ -200,6 +203,12 @@ import akka.util.OptionVal recordChildFailure(cause) untyped.SupervisorStrategy.Stop case ex => + val isTypedActor = sender() match { + case afwc: ActorRefWithCell => + afwc.underlying.props.producer.actorClass == classOf[ActorAdapter[_]] + case _ => + false + } recordChildFailure(ex) val logMessage = ex match { case e: ActorInitializationException if e.getCause ne null => @@ -211,7 +220,10 @@ import akka.util.OptionVal } // log at Error as that is what the supervision strategy would have done. log.error(ex, logMessage) - untyped.SupervisorStrategy.Stop + if (isTypedActor) + untyped.SupervisorStrategy.Stop + else + untyped.SupervisorStrategy.Restart } private def recordChildFailure(ex: Throwable): Unit = { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index d477f6772a..4024253162 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -10,11 +10,42 @@ import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi import akka.event.LoggingFilterWithMarker import akka.util.OptionVal -import akka.{ ConfigurationException, actor => untyped } +import akka.{ actor => untyped } import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ +@InternalApi +private[akka] object ActorContextAdapter { + + private def toUntypedImp[U](context: TypedActorContext[_]): untyped.ActorContext = + context match { + case adapter: ActorContextAdapter[_] => adapter.untypedContext + case _ => + throw new UnsupportedOperationException( + "only adapted untyped ActorContext permissible " + + s"($context of class ${context.getClass.getName})") + } + + def toUntyped[U](context: scaladsl.ActorContext[_]): untyped.ActorContext = + context match { + case c: TypedActorContext[_] => toUntypedImp(c) + case _ => + throw new UnsupportedOperationException( + "unknown ActorContext type " + + s"($context of class ${context.getClass.getName})") + } + + def toUntyped[U](context: javadsl.ActorContext[_]): untyped.ActorContext = + context match { + case c: TypedActorContext[_] => toUntypedImp(c) + case _ => + throw new UnsupportedOperationException( + "unknown ActorContext type " + + s"($context of class ${context.getClass.getName})") + } +} + /** * INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[TypedActorContext]]. */ @@ -35,9 +66,9 @@ import scala.concurrent.duration._ override def children: Iterable[ActorRef[Nothing]] = untypedContext.children.map(ActorRefAdapter(_)) override def child(name: String): Option[ActorRef[Nothing]] = untypedContext.child(name).map(ActorRefAdapter(_)) override def spawnAnonymous[U](behavior: Behavior[U], props: Props = Props.empty): ActorRef[U] = - ActorContextAdapter.spawnAnonymous(untypedContext, behavior, props) + ActorRefFactoryAdapter.spawnAnonymous(untypedContext, behavior, props, rethrowTypedFailure = true) override def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U] = - ActorContextAdapter.spawn(untypedContext, behavior, name, props) + ActorRefFactoryAdapter.spawn(untypedContext, behavior, name, props, rethrowTypedFailure = true) override def stop[U](child: ActorRef[U]): Unit = if (child.path.parent == self.path) { // only if a direct child toUntyped(child) match { @@ -117,59 +148,3 @@ import scala.concurrent.duration._ */ private[akka] override def onUnhandled(msg: T): Unit = adapter.unhandled(msg) } - -/** - * INTERNAL API - */ -@InternalApi private[typed] object ActorContextAdapter { - - private def toUntypedImp[U](context: TypedActorContext[_]): untyped.ActorContext = - context match { - case adapter: ActorContextAdapter[_] => adapter.untypedContext - case _ => - throw new UnsupportedOperationException( - "only adapted untyped ActorContext permissible " + - s"($context of class ${context.getClass.getName})") - } - - def toUntyped2[U](context: TypedActorContext[_]): untyped.ActorContext = toUntypedImp(context) - - def toUntyped[U](context: scaladsl.ActorContext[_]): untyped.ActorContext = - context match { - case c: TypedActorContext[_] => toUntypedImp(c) - case _ => - throw new UnsupportedOperationException( - "unknown ActorContext type " + - s"($context of class ${context.getClass.getName})") - } - - def toUntyped[U](context: javadsl.ActorContext[_]): untyped.ActorContext = - context match { - case c: TypedActorContext[_] => toUntypedImp(c) - case _ => - throw new UnsupportedOperationException( - "unknown ActorContext type " + - s"($context of class ${context.getClass.getName})") - } - - def spawnAnonymous[T](context: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = { - try { - Behavior.validateAsInitial(behavior) - ActorRefAdapter(context.actorOf(PropsAdapter(() => behavior, props))) - } catch { - case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") => - throw new ConfigurationException("Remote deployment not allowed for typed actors", ex) - } - } - - def spawn[T](context: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = { - try { - Behavior.validateAsInitial(behavior) - ActorRefAdapter(context.actorOf(PropsAdapter(() => behavior, props), name)) - } catch { - case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") => - throw new ConfigurationException("Remote deployment not allowed for typed actors", ex) - } - } - -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefFactoryAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefFactoryAdapter.scala new file mode 100644 index 0000000000..7cc05c4c99 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefFactoryAdapter.scala @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.adapter +import akka.actor.typed._ +import akka.annotation.InternalApi +import akka.ConfigurationException + +/** + * INTERNAL API + */ +@InternalApi private[typed] object ActorRefFactoryAdapter { + def spawnAnonymous[T]( + context: akka.actor.ActorRefFactory, + behavior: Behavior[T], + props: Props, + rethrowTypedFailure: Boolean): ActorRef[T] = { + try { + ActorRefAdapter(context.actorOf(internal.adapter.PropsAdapter(() => behavior, props, rethrowTypedFailure))) + } catch { + case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") => + throw new ConfigurationException("Remote deployment not allowed for typed actors", ex) + } + } + + def spawn[T]( + actorRefFactory: akka.actor.ActorRefFactory, + behavior: Behavior[T], + name: String, + props: Props, + rethrowTypedFailure: Boolean): ActorRef[T] = { + try { + ActorRefAdapter( + actorRefFactory.actorOf( + internal.adapter.PropsAdapter(() => Behavior.validateAsInitial(behavior), props, rethrowTypedFailure), + name)) + } catch { + case ex: ConfigurationException if ex.getMessage.startsWith("configuration requested remote deployment") => + throw new ConfigurationException("Remote deployment not allowed for typed actors", ex) + } + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala index 8416a2d2c9..4708cf04a3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala @@ -13,8 +13,11 @@ import akka.annotation.InternalApi * INTERNAL API */ @InternalApi private[akka] object PropsAdapter { - def apply[T](behavior: () => Behavior[T], deploy: Props = Props.empty): akka.actor.Props = { - val props = akka.actor.Props(new ActorAdapter(behavior())) + def apply[T]( + behavior: () => Behavior[T], + deploy: Props = Props.empty, + rethrowTypedFailure: Boolean = true): akka.actor.Props = { + val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure)) (deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match { case _: DispatcherDefault => props diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala index 8bdbf892fa..13419c4efc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala @@ -30,27 +30,67 @@ import akka.japi.Creator */ object Adapter { + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorSystem. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T]): ActorRef[T] = spawnAnonymous(sys, behavior, EmptyProps) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorSystem. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], props: Props): ActorRef[T] = sys.spawnAnonymous(behavior, props) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorSystem. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String): ActorRef[T] = spawn(sys, behavior, name, EmptyProps) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorSystem. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = sys.spawn(behavior, name, props) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorContext. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T]): ActorRef[T] = spawnAnonymous(ctx, behavior, EmptyProps) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorContext. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = ctx.spawnAnonymous(behavior, props) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorContext. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String): ActorRef[T] = spawn(ctx, behavior, name, EmptyProps) + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorContext. + * Typed actors default supervision strategy is to stop. Can be overridden with + * Behaviors.supervise. + */ def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = ctx.spawn(behavior, name, props) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala index 101d38cf97..3d352c157f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala @@ -38,12 +38,33 @@ package object adapter { */ implicit class UntypedActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal { + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorSystem. + * + * Typed actors default supervision strategy is to stop. Can be overridden with + * `Behaviors.supervise`. + */ def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] = { - ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props))) + ActorRefFactoryAdapter.spawnAnonymous( + sys, + Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop), + props, + rethrowTypedFailure = false) } + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorSystem. + * + * Typed actors default supervision strategy is to stop. Can be overridden with + * `Behaviors.supervise`. + */ def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] = { - ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props), name)) + ActorRefFactoryAdapter.spawn( + sys, + Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop), + name, + props, + rethrowTypedFailure = false) } def toTyped: ActorSystem[Nothing] = AdapterExtension(sys).adapter @@ -70,10 +91,33 @@ package object adapter { * Extension methods added to [[akka.actor.ActorContext]]. */ implicit class UntypedActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal { + + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorContext. + * + * Typed actors default supervision strategy is to stop. Can be overridden with + * `Behaviors.supervise`. + */ def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] = - ActorContextAdapter.spawnAnonymous(ctx, behavior, props) + ActorRefFactoryAdapter.spawnAnonymous( + ctx, + Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop), + props, + rethrowTypedFailure = false) + + /** + * Spawn the given behavior as a child of the user actor in an untyped ActorContext. + * + * Typed actors default supervision strategy is to stop. Can be overridden with + * `Behaviors.supervise`. + */ def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] = - ActorContextAdapter.spawn(ctx, behavior, name, props) + ActorRefFactoryAdapter.spawn( + ctx, + Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop), + name, + props, + rethrowTypedFailure = false) def watch[U](other: ActorRef[U]): Unit = ctx.watch(ActorRefAdapter.toUntyped(other)) def unwatch[U](other: ActorRef[U]): Unit = ctx.unwatch(ActorRefAdapter.toUntyped(other)) @@ -88,6 +132,7 @@ package object adapter { implicit class TypedActorContextOps(val ctx: scaladsl.ActorContext[_]) extends AnyVal { def actorOf(props: akka.actor.Props): akka.actor.ActorRef = ActorContextAdapter.toUntyped(ctx).actorOf(props) + def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef = ActorContextAdapter.toUntyped(ctx).actorOf(props, name) diff --git a/akka-docs/src/main/paradox/typed/coexisting.md b/akka-docs/src/main/paradox/typed/coexisting.md index 2e2737dcb2..5c92412191 100644 --- a/akka-docs/src/main/paradox/typed/coexisting.md +++ b/akka-docs/src/main/paradox/typed/coexisting.md @@ -115,6 +115,12 @@ Scala Java : @@snip [TypedWatchingUntypedTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/coexistence/TypedWatchingUntypedTest.java) { #typed } -There is one caveat regarding supervision of untyped child from typed parent. If the child throws an exception we would expect it to be restarted, but supervision in Akka Typed defaults to stopping the child in case it fails. The restarting facilities in Akka Typed will not work with untyped children. However, the workaround is to add another untyped actor that takes care of the supervision, i.e. restarts in case of failure if that is the desired behavior. +## Supervision + +The default supervision for untyped actors is to restart where as for typed it is to stop. +When combining untyped and and typed actors the default supervision is based on the default behavior of +the child i.e. if an untyped actor creates a typed child, its default supervision will be to stop. If a typed +actor creates an untyped child, its default supervision will be to restart. +