diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index 1feb87e0ea..6bbae2492f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -187,7 +187,6 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { pending val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) - implicit val timeout = Timeout(500.millis) try { val p = FileIO.fromPath(manyLines) @@ -195,7 +194,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { .runWith(TestSink.probe)(materializer) materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get try assertDispatcher(ref, "akka.actor.default-dispatcher") finally p.cancel() } finally shutdown(sys) } diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 2e3067f3bf..8c170c01bf 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -101,4 +101,4 @@ private[akka] object NoMaterializer extends Materializer { case class MaterializationContext( materializer: Materializer, effectiveAttributes: Attributes, - stageName: String) + islandName: String) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 1e3864f6c3..c9ee63d252 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -43,23 +43,22 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { val dispatcher = if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher else props.dispatcher - actorOf(props, context.stageName, dispatcher) + actorOf(props.withDispatcher(dispatcher), context.islandName) } /** * INTERNAL API */ - // TODO: hide it again - def actorOf(props: Props, name: String, dispatcher: String): ActorRef = { + def actorOf(props: Props, name: String): ActorRef = { supervisor match { case ref: LocalActorRef ⇒ - ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) + ref.underlying.attachChild(props, name, systemService = false) case ref: RepointableActorRef ⇒ if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false) + ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false) else { implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef] + val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef] Await.result(f, timeout.duration) } case unknown ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 0dc573d665..62e5988e8c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -26,36 +26,42 @@ object PhasedFusingActorMaterializer { val Debug = false val DefaultPhase: Phase[Any] = new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] = - new GraphStageIsland(settings, materializer).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = + new GraphStageIsland(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]] } val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]]( SinkModuleIslandTag → new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] = - (new SinkModulePhase(materializer)).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = + (new SinkModulePhase(materializer, islandName)).asInstanceOf[PhaseIsland[Any]] }, SourceModuleIslandTag → new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] = - new SourceModulePhase(materializer).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = + new SourceModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, ProcessorModuleIslandTag → new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] = - new ProcessorModulePhase(materializer).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = + new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, - GraphStageTag → DefaultPhase - ) + GraphStageTag → DefaultPhase) def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = { val haveShutDown = new AtomicBoolean(false) val system = actorSystemOf(context) val materializerSettings = ActorMaterializerSettings(system) + val streamSupervisor = context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown) + .withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) + PhasedFusingActorMaterializer( system, materializerSettings, system.dispatchers, - context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), + streamSupervisor, haveShutDown, FlowNames(system).name.copy("flow")) } @@ -105,11 +111,18 @@ class IslandTracking( val phases: Map[IslandTag, Phase[Any]], val settings: ActorMaterializerSettings, defaultPhase: Phase[Any], - val materializer: PhasedFusingActorMaterializer -) { + val materializer: PhasedFusingActorMaterializer, + islandNamePrefix: String) { import PhasedFusingActorMaterializer.Debug + private var stageCounter = 0 + private def nextStageName(): String = { + val s = islandNamePrefix + stageCounter + stageCounter += 1 + s + } + private var currentGlobalOffset = 0 private var currentSegmentGlobalOffset = 0 private var currentIslandGlobalOffset = 0 @@ -120,7 +133,7 @@ class IslandTracking( private var segments: java.util.ArrayList[SegmentInfo] = null private var forwardWires: java.util.ArrayList[ForwardWire] = null - private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer) + private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextStageName()) def getCurrentPhase: PhaseIsland[Any] = currentPhase def getCurrentOffset: Int = currentGlobalOffset @@ -135,8 +148,7 @@ class IslandTracking( length = currentGlobalOffset - currentSegmentGlobalOffset, globalBaseOffset = currentSegmentGlobalOffset, relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots, - currentPhase - ) + currentPhase) // Segment tracking is by demand, we only allocate this list if it is used. // If there are no islands, then there is no need to track segments @@ -155,20 +167,8 @@ class IslandTracking( val previousPhase = currentPhase val previousIslandOffset = currentIslandGlobalOffset - val effectiveSettings: ActorMaterializerSettings = { - import Attributes._ - import ActorAttributes._ - attributes.attributeList.foldLeft(settings) { (s, attr) ⇒ - attr match { - case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) ⇒ s.withDispatcher(dispatcher) - case SupervisionStrategy(decider) ⇒ s.withSupervisionStrategy(decider) - case _ ⇒ s - } - } - } - - currentPhase = phases(tag)(effectiveSettings, materializer) + val effectiveSettings = materializer.effectiveSettings(attributes) + currentPhase = phases(tag)(effectiveSettings, materializer, nextStageName()) if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") // Resolve the phase to be used to materialize this island @@ -293,8 +293,7 @@ class IslandTracking( from = out, toGlobalOffset = absoluteOffset, logic, - currentPhase - ) + currentPhase) if (Debug) println(s" wiring is forward, recording $forwardWire") forwardWires.add(forwardWire) @@ -310,8 +309,7 @@ case class PhasedFusingActorMaterializer( dispatchers: Dispatchers, supervisor: ActorRef, haveShutDown: AtomicBoolean, - flowNames: SeqActorName -) extends ExtendedActorMaterializer { + flowNames: SeqActorName) extends ExtendedActorMaterializer { import PhasedFusingActorMaterializer._ private val _logger = Logging.getLogger(system, this) @@ -382,8 +380,7 @@ case class PhasedFusingActorMaterializer( subflowFuser, initialAttributes, PhasedFusingActorMaterializer.DefaultPhase, - PhasedFusingActorMaterializer.DefaultPhases - ) + PhasedFusingActorMaterializer.DefaultPhases) } def materialize[Mat]( @@ -391,9 +388,8 @@ case class PhasedFusingActorMaterializer( subflowFuser: GraphInterpreterShell ⇒ ActorRef, initialAttributes: Attributes, defaultPhase: Phase[Any], - phases: Map[IslandTag, Phase[Any]] - ): Mat = { - val islandTracking = new IslandTracking(phases, settings, defaultPhase, this) + phases: Map[IslandTag, Phase[Any]]): Mat = { + val islandTracking = new IslandTracking(phases, settings, defaultPhase, this, islandNamePrefix = createFlowName() + "-") var current: Traversal = graph.traversalBuilder.traversal @@ -493,7 +489,10 @@ case class PhasedFusingActorMaterializer( trait IslandTag trait Phase[M] { - def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[M] + def apply( + effectiveSettings: ActorMaterializerSettings, + materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[M] } trait PhaseIsland[M] { @@ -517,9 +516,9 @@ trait PhaseIsland[M] { object GraphStageTag extends IslandTag final class GraphStageIsland( - settings: ActorMaterializerSettings, - materializer: PhasedFusingActorMaterializer -) extends PhaseIsland[GraphStageLogic] { + effectiveSettings: ActorMaterializerSettings, + materializer: PhasedFusingActorMaterializer, + islandName: String) extends PhaseIsland[GraphStageLogic] { // TODO: remove these private val logicArrayType = Array.empty[GraphStageLogic] private[this] val logics = new ArrayList[GraphStageLogic](64) @@ -531,7 +530,7 @@ final class GraphStageIsland( val shell = new GraphInterpreterShell( connections = null, logics = null, - settings, + effectiveSettings, materializer) override def name: String = "Fusing GraphStages phase" @@ -640,8 +639,8 @@ final class GraphStageIsland( // subflowFuser(shell) // } else { val props = ActorGraphInterpreter.props(shell) - // TODO: actor names - materializer.actorOf(props, "fused" + Random.nextInt(), settings.dispatcher) + .withDispatcher(effectiveSettings.dispatcher) + materializer.actorOf(props, islandName) // } } @@ -651,12 +650,14 @@ final class GraphStageIsland( object SourceModuleIslandTag extends IslandTag -final class SourceModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[Publisher[Any]] { +final class SourceModulePhase( + materializer: PhasedFusingActorMaterializer, + islandName: String) extends PhaseIsland[Publisher[Any]] { override def name: String = s"SourceModule phase" override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (Publisher[Any], Any) = { - // TODO: proper stage name - mod.asInstanceOf[SourceModule[Any, Any]].create(MaterializationContext(materializer, attributes, "stageName")) + mod.asInstanceOf[SourceModule[Any, Any]].create(MaterializationContext(materializer, attributes, + islandName + "-" + attributes.nameOrDefault())) } override def assignPort(in: InPort, slot: Int, logic: Publisher[Any]): Unit = () @@ -673,14 +674,15 @@ final class SourceModulePhase(materializer: PhasedFusingActorMaterializer) exten object SinkModuleIslandTag extends IslandTag -final class SinkModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[AnyRef] { +final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) + extends PhaseIsland[AnyRef] { override def name: String = s"SourceModule phase" var subscriberOrVirtualPublisher: AnyRef = _ override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (AnyRef, Any) = { - // TODO: proper stage name val subAndMat = - mod.asInstanceOf[SinkModule[Any, Any]].create(MaterializationContext(materializer, attributes, "stageName")) + mod.asInstanceOf[SinkModule[Any, Any]].create(MaterializationContext(materializer, attributes, + islandName + "-" + attributes.nameOrDefault())) subscriberOrVirtualPublisher = subAndMat._1 (subscriberOrVirtualPublisher, subAndMat._2) @@ -706,7 +708,8 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer) extends object ProcessorModuleIslandTag extends IslandTag -final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[Processor[Any, Any]] { +final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) + extends PhaseIsland[Processor[Any, Any]] { override def name: String = "ProcessorModulePhase" private[this] var processor: Processor[Any, Any] = _ @@ -723,4 +726,4 @@ final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer) ex override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor) override def onIslandReady(): Unit = () -} \ No newline at end of file +}