From e26a90f340120fd35a02a0ca023e5dbfed319b2b Mon Sep 17 00:00:00 2001 From: Dolly Gyanchandani <10329479+Dollyg@users.noreply.github.com> Date: Thu, 18 Oct 2018 18:33:27 +0530 Subject: [PATCH] Create typed ActorMaterializer from ActorContext, ##25536 * Create typed ActorMaterializer from ActorContext to bind stream's lifecycle with an actor's lifecycle * Add Java API for creating typed ActorMaterializer from ActorContext --- .../akka/actor/typed/javadsl/Adapter.scala | 4 +++ .../typed/scaladsl/adapter/package.scala | 2 ++ .../javadsl/ActorMaterializerFactory.scala | 31 +++++++++++++++++++ .../typed/scaladsl/ActorMaterializer.scala | 16 ++++++++++ .../stream/typed/scaladsl/ActorFlowSpec.scala | 1 - .../CustomGuardianAndMaterializerSpec.scala | 26 +++++++++++++--- 6 files changed, 74 insertions(+), 6 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala index 87a28e67b6..740bc5483c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Adapter.scala @@ -4,6 +4,7 @@ package akka.actor.typed.javadsl +import akka.actor import akka.actor.typed.Behavior import akka.actor.typed.Props import akka.actor.typed.EmptyProps @@ -59,6 +60,9 @@ object Adapter { def toUntyped(sys: ActorSystem[_]): akka.actor.ActorSystem = sys.toUntyped + def toUntyped(ctx: ActorContext[_]): actor.ActorContext = + ActorContextAdapter.toUntyped(ctx) + def watch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit = ctx.watch(other) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala index 34083a3b2a..acf34775b6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala @@ -88,6 +88,8 @@ package object adapter { def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef = ActorContextAdapter.toUntyped(ctx).actorOf(props, name) + def toUntyped: akka.actor.ActorContext = ActorContextAdapter.toUntyped(ctx) + // watch, unwatch and stop not needed here because of the implicit ActorRef conversion } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorMaterializerFactory.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorMaterializerFactory.scala index 6c112dc25b..08f28243b8 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorMaterializerFactory.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorMaterializerFactory.scala @@ -5,6 +5,7 @@ package akka.stream.typed.javadsl import akka.actor.typed.ActorSystem +import akka.actor.typed.javadsl.{ ActorContext, Adapter } import akka.stream.ActorMaterializerSettings object ActorMaterializerFactory { @@ -43,4 +44,34 @@ object ActorMaterializerFactory { def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = akka.stream.ActorMaterializer.create(settings, actorSystem.toUntyped, namePrefix) + /** + * Creates an `ActorMaterializer` which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams + * will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]] + * + * Defaults the actor name prefix used to name actors running the processing steps to `"flow"`. + * The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create[T](ctx: ActorContext[T]): akka.stream.ActorMaterializer = + akka.stream.ActorMaterializer.create(Adapter.toUntyped(ctx)) + + /** + * Creates an `ActorMaterializer` which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams + * will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]] + */ + def create[T](settings: ActorMaterializerSettings, ctx: ActorContext[T]): akka.stream.ActorMaterializer = + akka.stream.ActorMaterializer.create(settings, Adapter.toUntyped(ctx)) + + /** + * Creates an `ActorMaterializer` which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams + * will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]] + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create[T](settings: ActorMaterializerSettings, namePrefix: String, ctx: ActorContext[T]): akka.stream.ActorMaterializer = + akka.stream.ActorMaterializer.create(settings, Adapter.toUntyped(ctx), namePrefix) } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorMaterializer.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorMaterializer.scala index 1b5fb6a42f..da2ddfdd02 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorMaterializer.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorMaterializer.scala @@ -5,6 +5,7 @@ package akka.stream.typed.scaladsl import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.ActorContext import akka.stream.ActorMaterializerSettings object ActorMaterializer { @@ -25,4 +26,19 @@ object ActorMaterializer { def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): ActorMaterializer = akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped) + /** + * Creates an `ActorMaterializer` which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams + * will be bound to the lifecycle of the provided [[akka.actor.typed.scaladsl.ActorContext]] + * + * The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the + * configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]]. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def boundToActor[T](ctx: ActorContext[T], materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None): ActorMaterializer = + akka.stream.ActorMaterializer(materializerSettings, namePrefix)(ctx.toUntyped) + } diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala index 4bf6e0f669..02dd79a142 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala @@ -5,7 +5,6 @@ package akka.stream.typed.scaladsl //#imports -import akka.stream.typed.scaladsl.ActorMaterializer import akka.stream.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala index 334b2aaf04..a2a30a6606 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala @@ -4,14 +4,16 @@ package akka.stream.typed.scaladsl -import scala.concurrent.Future +import akka.Done +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.stream.AbruptStageTerminationException +import akka.stream.scaladsl.{ Sink, Source } import org.scalatest.WordSpecLike +import scala.concurrent.Future + object CustomGuardianAndMaterializerSpec { sealed trait GuardianProtocol @@ -38,6 +40,20 @@ class CustomGuardianAndMaterializerSpec extends ScalaTestWithActorTestKit with W it.futureValue should ===("hello") } - } + "should kill streams with bound actor context" in { + var doneF: Future[Done] = null + val behavior = + Behaviors.setup[String] { ctx ⇒ + implicit val mat: ActorMaterializer = ActorMaterializer.boundToActor(ctx) + doneF = Source.repeat("hello").runWith(Sink.ignore) + Behaviors.receiveMessage[String](_ ⇒ Behaviors.stopped) + } + + val actorRef = spawn(behavior) + + actorRef ! "kill" + eventually(doneF.failed.futureValue shouldBe an[AbruptStageTerminationException]) + } + } }