=str #24298 ActorMaterializer now starts actors under /system unless
inside ActorContext, in which case it still is child actors as usual This makes sense as they're "internal", so more like system actors anyway, but the major reason for the change is Akka Typed, in which we do not control the user guardian, and as such can not attach things from the side into it
This commit is contained in:
parent
cbe0215c41
commit
dd62071ff8
4 changed files with 65 additions and 23 deletions
|
|
@ -3,16 +3,16 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.typed.scaladsl
|
package akka.stream.typed.scaladsl
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorRef
|
||||||
|
import akka.actor.typed.TypedAkkaSpecWithShutdown
|
||||||
import akka.actor.typed.scaladsl.Behaviors
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
import akka.stream.OverflowStrategy
|
import akka.stream.OverflowStrategy
|
||||||
import akka.actor.typed.{ ActorRef, ActorSystem }
|
import akka.stream.scaladsl.Keep
|
||||||
import akka.testkit.TestKit
|
import akka.stream.scaladsl.Sink
|
||||||
import akka.testkit.typed.scaladsl._
|
import akka.stream.scaladsl.Source
|
||||||
import akka.stream.scaladsl.{ Keep, Sink, Source }
|
|
||||||
import akka.stream.typed.ActorMaterializer
|
import akka.stream.typed.ActorMaterializer
|
||||||
import akka.testkit.typed.TestKitSettings
|
import akka.testkit.typed.TestKit
|
||||||
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
|
import akka.testkit.typed.scaladsl._
|
||||||
import org.scalatest.concurrent.ScalaFutures
|
|
||||||
|
|
||||||
object ActorSourceSinkSpec {
|
object ActorSourceSinkSpec {
|
||||||
|
|
||||||
|
|
@ -23,22 +23,11 @@ object ActorSourceSinkSpec {
|
||||||
case object Failed extends AckProto
|
case object Failed extends AckProto
|
||||||
}
|
}
|
||||||
|
|
||||||
class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures {
|
class ActorSourceSinkSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
||||||
import ActorSourceSinkSpec._
|
import ActorSourceSinkSpec._
|
||||||
import akka.actor.typed.scaladsl.adapter._
|
|
||||||
|
|
||||||
// FIXME use Typed Teskit
|
|
||||||
// The materializer creates a top-level actor when materializing a stream.
|
|
||||||
// Currently that is not supported, because a Typed Teskit uses a typed actor system
|
|
||||||
// with a custom guardian. Because of custom guardian, an exception is being thrown
|
|
||||||
// when trying to create a top level actor during materialization.
|
|
||||||
implicit val sys = ActorSystem.wrap(system)
|
|
||||||
implicit val testkitSettings = TestKitSettings(sys)
|
|
||||||
implicit val mat = ActorMaterializer()
|
implicit val mat = ActorMaterializer()
|
||||||
|
|
||||||
override protected def afterAll(): Unit =
|
|
||||||
sys.terminate()
|
|
||||||
|
|
||||||
"ActorSink" should {
|
"ActorSink" should {
|
||||||
|
|
||||||
"accept messages" in {
|
"accept messages" in {
|
||||||
|
|
@ -76,7 +65,7 @@ class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot))
|
val pilotRef: ActorRef[AckProto] = spawn(autoPilot)
|
||||||
|
|
||||||
val in =
|
val in =
|
||||||
Source.queue[String](10, OverflowStrategy.dropBuffer)
|
Source.queue[String](10, OverflowStrategy.dropBuffer)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||||
|
*/
|
||||||
|
package akka.stream.typed.scaladsl
|
||||||
|
|
||||||
|
import scala.concurrent.Future
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorRef
|
||||||
|
import akka.actor.typed.TypedAkkaSpecWithShutdown
|
||||||
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
import akka.stream.typed.ActorMaterializer
|
||||||
|
import akka.testkit.typed.TestKit
|
||||||
|
|
||||||
|
object CustomGuardianAndMaterializerSpec {
|
||||||
|
|
||||||
|
sealed trait GuardianProtocol
|
||||||
|
case class Init(sender: ActorRef[String]) extends GuardianProtocol
|
||||||
|
case class Msg(sender: ActorRef[String], msg: String) extends GuardianProtocol
|
||||||
|
case object Complete extends GuardianProtocol
|
||||||
|
case object Failed extends GuardianProtocol
|
||||||
|
}
|
||||||
|
|
||||||
|
class CustomGuardianAndMaterializerSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
||||||
|
import CustomGuardianAndMaterializerSpec._
|
||||||
|
|
||||||
|
val guardian = Behaviors.immutable[GuardianProtocol] {
|
||||||
|
(_, msg) ⇒ Behaviors.same
|
||||||
|
}
|
||||||
|
|
||||||
|
implicit val mat = ActorMaterializer()
|
||||||
|
|
||||||
|
"ActorMaterializer" should {
|
||||||
|
|
||||||
|
"work with typed ActorSystem with custom guardian" in {
|
||||||
|
val it: Future[String] = Source.single("hello").runWith(Sink.head)
|
||||||
|
|
||||||
|
it.futureValue should ===("hello")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.stream
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
|
||||||
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
|
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ActorSystemImpl, ExtendedActorSystem, Props }
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.util.Helpers.toRootLowerCase
|
import akka.util.Helpers.toRootLowerCase
|
||||||
import akka.stream.impl._
|
import akka.stream.impl._
|
||||||
|
|
@ -60,11 +60,20 @@ object ActorMaterializer {
|
||||||
system,
|
system,
|
||||||
materializerSettings,
|
materializerSettings,
|
||||||
system.dispatchers,
|
system.dispatchers,
|
||||||
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()),
|
actorOfStreamSupervisor(materializerSettings, context, haveShutDown),
|
||||||
haveShutDown,
|
haveShutDown,
|
||||||
FlowNames(system).name.copy(namePrefix))
|
FlowNames(system).name.copy(namePrefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def actorOfStreamSupervisor(materializerSettings: ActorMaterializerSettings, context: ActorRefFactory, haveShutDown: AtomicBoolean) =
|
||||||
|
context match {
|
||||||
|
case s: ExtendedActorSystem ⇒
|
||||||
|
s.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName())
|
||||||
|
|
||||||
|
case a: ActorContext ⇒
|
||||||
|
a.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName())
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams.
|
* Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase {
|
||||||
def this(name: String, config: Config) = this(name, Some(config))
|
def this(name: String, config: Config) = this(name, Some(config))
|
||||||
|
|
||||||
import TestKit._
|
import TestKit._
|
||||||
implicit val system = ActorSystem(testKitGuardian, name, config = config)
|
implicit val system: ActorSystem[TestKitCommand] = ActorSystem(testKitGuardian, name, config = config)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue