From dd62071ff8e28622d6afbf680ae6aa41af9e91ae Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 22 Jan 2018 21:51:49 +0900 Subject: [PATCH] =str #24298 ActorMaterializer now starts actors under /system unless inside ActorContext, in which case it still is child actors as usual This makes sense as they're "internal", so more like system actors anyway, but the major reason for the change is Akka Typed, in which we do not control the user guardian, and as such can not attach things from the side into it --- .../typed/scaladsl/ActorSourceSinkSpec.scala | 29 ++++-------- .../CustomGuardianAndMaterializerSpec.scala | 44 +++++++++++++++++++ .../scala/akka/stream/ActorMaterializer.scala | 13 +++++- .../scala/akka/testkit/typed/TestKit.scala | 2 +- 4 files changed, 65 insertions(+), 23 deletions(-) create mode 100644 akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala index 8d5ba79249..958240326c 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -3,16 +3,16 @@ */ package akka.stream.typed.scaladsl +import akka.actor.typed.ActorRef +import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.actor.typed.scaladsl.Behaviors import akka.stream.OverflowStrategy -import akka.actor.typed.{ ActorRef, ActorSystem } -import akka.testkit.TestKit -import akka.testkit.typed.scaladsl._ -import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source import akka.stream.typed.ActorMaterializer -import akka.testkit.typed.TestKitSettings -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } -import org.scalatest.concurrent.ScalaFutures +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl._ object ActorSourceSinkSpec { @@ -23,22 +23,11 @@ object ActorSourceSinkSpec { case object Failed extends AckProto } -class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures { +class ActorSourceSinkSpec extends TestKit with TypedAkkaSpecWithShutdown { import ActorSourceSinkSpec._ - import akka.actor.typed.scaladsl.adapter._ - // FIXME use Typed Teskit - // The materializer creates a top-level actor when materializing a stream. - // Currently that is not supported, because a Typed Teskit uses a typed actor system - // with a custom guardian. Because of custom guardian, an exception is being thrown - // when trying to create a top level actor during materialization. - implicit val sys = ActorSystem.wrap(system) - implicit val testkitSettings = TestKitSettings(sys) implicit val mat = ActorMaterializer() - override protected def afterAll(): Unit = - sys.terminate() - "ActorSink" should { "accept messages" in { @@ -76,7 +65,7 @@ class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSin } } - val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot)) + val pilotRef: ActorRef[AckProto] = spawn(autoPilot) val in = Source.queue[String](10, OverflowStrategy.dropBuffer) diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala new file mode 100644 index 0000000000..2b83bfad24 --- /dev/null +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import scala.concurrent.Future + +import akka.actor.typed.ActorRef +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.actor.typed.scaladsl.Behaviors +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.typed.ActorMaterializer +import akka.testkit.typed.TestKit + +object CustomGuardianAndMaterializerSpec { + + sealed trait GuardianProtocol + case class Init(sender: ActorRef[String]) extends GuardianProtocol + case class Msg(sender: ActorRef[String], msg: String) extends GuardianProtocol + case object Complete extends GuardianProtocol + case object Failed extends GuardianProtocol +} + +class CustomGuardianAndMaterializerSpec extends TestKit with TypedAkkaSpecWithShutdown { + import CustomGuardianAndMaterializerSpec._ + + val guardian = Behaviors.immutable[GuardianProtocol] { + (_, msg) ⇒ Behaviors.same + } + + implicit val mat = ActorMaterializer() + + "ActorMaterializer" should { + + "work with typed ActorSystem with custom guardian" in { + val it: Future[String] = Source.single("hello").runWith(Sink.head) + + it.futureValue should ===("hello") + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index cb9e8731c8..ef542d0372 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -6,7 +6,7 @@ package akka.stream import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } +import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ActorSystemImpl, ExtendedActorSystem, Props } import akka.event.LoggingAdapter import akka.util.Helpers.toRootLowerCase import akka.stream.impl._ @@ -60,11 +60,20 @@ object ActorMaterializer { system, materializerSettings, system.dispatchers, - context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), + actorOfStreamSupervisor(materializerSettings, context, haveShutDown), haveShutDown, FlowNames(system).name.copy(namePrefix)) } + private def actorOfStreamSupervisor(materializerSettings: ActorMaterializerSettings, context: ActorRefFactory, haveShutDown: AtomicBoolean) = + context match { + case s: ExtendedActorSystem ⇒ + s.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) + + case a: ActorContext ⇒ + a.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) + } + /** * Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. * diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index 28b4547ca8..a817d63a96 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -66,7 +66,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase { def this(name: String, config: Config) = this(name, Some(config)) import TestKit._ - implicit val system = ActorSystem(testKitGuardian, name, config = config) + implicit val system: ActorSystem[TestKitCommand] = ActorSystem(testKitGuardian, name, config = config) } @ApiMayChange