diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala index 8d5ba79249..958240326c 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -3,16 +3,16 @@ */ package akka.stream.typed.scaladsl +import akka.actor.typed.ActorRef +import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.actor.typed.scaladsl.Behaviors import akka.stream.OverflowStrategy -import akka.actor.typed.{ ActorRef, ActorSystem } -import akka.testkit.TestKit -import akka.testkit.typed.scaladsl._ -import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source import akka.stream.typed.ActorMaterializer -import akka.testkit.typed.TestKitSettings -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } -import org.scalatest.concurrent.ScalaFutures +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl._ object ActorSourceSinkSpec { @@ -23,22 +23,11 @@ object ActorSourceSinkSpec { case object Failed extends AckProto } -class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures { +class ActorSourceSinkSpec extends TestKit with TypedAkkaSpecWithShutdown { import ActorSourceSinkSpec._ - import akka.actor.typed.scaladsl.adapter._ - // FIXME use Typed Teskit - // The materializer creates a top-level actor when materializing a stream. - // Currently that is not supported, because a Typed Teskit uses a typed actor system - // with a custom guardian. Because of custom guardian, an exception is being thrown - // when trying to create a top level actor during materialization. - implicit val sys = ActorSystem.wrap(system) - implicit val testkitSettings = TestKitSettings(sys) implicit val mat = ActorMaterializer() - override protected def afterAll(): Unit = - sys.terminate() - "ActorSink" should { "accept messages" in { @@ -76,7 +65,7 @@ class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSin } } - val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot)) + val pilotRef: ActorRef[AckProto] = spawn(autoPilot) val in = Source.queue[String](10, OverflowStrategy.dropBuffer) diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala new file mode 100644 index 0000000000..2b83bfad24 --- /dev/null +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/CustomGuardianAndMaterializerSpec.scala @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import scala.concurrent.Future + +import akka.actor.typed.ActorRef +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.actor.typed.scaladsl.Behaviors +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.typed.ActorMaterializer +import akka.testkit.typed.TestKit + +object CustomGuardianAndMaterializerSpec { + + sealed trait GuardianProtocol + case class Init(sender: ActorRef[String]) extends GuardianProtocol + case class Msg(sender: ActorRef[String], msg: String) extends GuardianProtocol + case object Complete extends GuardianProtocol + case object Failed extends GuardianProtocol +} + +class CustomGuardianAndMaterializerSpec extends TestKit with TypedAkkaSpecWithShutdown { + import CustomGuardianAndMaterializerSpec._ + + val guardian = Behaviors.immutable[GuardianProtocol] { + (_, msg) ⇒ Behaviors.same + } + + implicit val mat = ActorMaterializer() + + "ActorMaterializer" should { + + "work with typed ActorSystem with custom guardian" in { + val it: Future[String] = Source.single("hello").runWith(Sink.head) + + it.futureValue should ===("hello") + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index cb9e8731c8..ef542d0372 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -6,7 +6,7 @@ package akka.stream import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } +import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ActorSystemImpl, ExtendedActorSystem, Props } import akka.event.LoggingAdapter import akka.util.Helpers.toRootLowerCase import akka.stream.impl._ @@ -60,11 +60,20 @@ object ActorMaterializer { system, materializerSettings, system.dispatchers, - context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), + actorOfStreamSupervisor(materializerSettings, context, haveShutDown), haveShutDown, FlowNames(system).name.copy(namePrefix)) } + private def actorOfStreamSupervisor(materializerSettings: ActorMaterializerSettings, context: ActorRefFactory, haveShutDown: AtomicBoolean) = + context match { + case s: ExtendedActorSystem ⇒ + s.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) + + case a: ActorContext ⇒ + a.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) + } + /** * Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. * diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index 28b4547ca8..a817d63a96 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -66,7 +66,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase { def this(name: String, config: Config) = this(name, Some(config)) import TestKit._ - implicit val system = ActorSystem(testKitGuardian, name, config = config) + implicit val system: ActorSystem[TestKitCommand] = ActorSystem(testKitGuardian, name, config = config) } @ApiMayChange