stream actor names, #22426 (#22465)

This commit is contained in:
Patrik Nordwall 2017-03-07 10:43:18 +01:00 committed by drewhk
parent 9a5c2fa5de
commit f6d45c1bc1
4 changed files with 64 additions and 63 deletions

View file

@ -26,36 +26,42 @@ object PhasedFusingActorMaterializer {
val Debug = false
val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] =
new GraphStageIsland(settings, materializer).asInstanceOf[PhaseIsland[Any]]
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[Any] =
new GraphStageIsland(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]]
}
val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]](
SinkModuleIslandTag new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] =
(new SinkModulePhase(materializer)).asInstanceOf[PhaseIsland[Any]]
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[Any] =
(new SinkModulePhase(materializer, islandName)).asInstanceOf[PhaseIsland[Any]]
},
SourceModuleIslandTag new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] =
new SourceModulePhase(materializer).asInstanceOf[PhaseIsland[Any]]
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[Any] =
new SourceModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
},
ProcessorModuleIslandTag new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] =
new ProcessorModulePhase(materializer).asInstanceOf[PhaseIsland[Any]]
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[Any] =
new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
},
GraphStageTag DefaultPhase
)
GraphStageTag DefaultPhase)
def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
val system = actorSystemOf(context)
val materializerSettings = ActorMaterializerSettings(system)
val streamSupervisor = context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown)
.withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName())
PhasedFusingActorMaterializer(
system,
materializerSettings,
system.dispatchers,
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()),
streamSupervisor,
haveShutDown,
FlowNames(system).name.copy("flow"))
}
@ -105,11 +111,18 @@ class IslandTracking(
val phases: Map[IslandTag, Phase[Any]],
val settings: ActorMaterializerSettings,
defaultPhase: Phase[Any],
val materializer: PhasedFusingActorMaterializer
) {
val materializer: PhasedFusingActorMaterializer,
islandNamePrefix: String) {
import PhasedFusingActorMaterializer.Debug
private var stageCounter = 0
private def nextStageName(): String = {
val s = islandNamePrefix + stageCounter
stageCounter += 1
s
}
private var currentGlobalOffset = 0
private var currentSegmentGlobalOffset = 0
private var currentIslandGlobalOffset = 0
@ -120,7 +133,7 @@ class IslandTracking(
private var segments: java.util.ArrayList[SegmentInfo] = null
private var forwardWires: java.util.ArrayList[ForwardWire] = null
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer)
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextStageName())
def getCurrentPhase: PhaseIsland[Any] = currentPhase
def getCurrentOffset: Int = currentGlobalOffset
@ -135,8 +148,7 @@ class IslandTracking(
length = currentGlobalOffset - currentSegmentGlobalOffset,
globalBaseOffset = currentSegmentGlobalOffset,
relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots,
currentPhase
)
currentPhase)
// Segment tracking is by demand, we only allocate this list if it is used.
// If there are no islands, then there is no need to track segments
@ -155,20 +167,8 @@ class IslandTracking(
val previousPhase = currentPhase
val previousIslandOffset = currentIslandGlobalOffset
val effectiveSettings: ActorMaterializerSettings = {
import Attributes._
import ActorAttributes._
attributes.attributeList.foldLeft(settings) { (s, attr)
attr match {
case InputBuffer(initial, max) s.withInputBuffer(initial, max)
case Dispatcher(dispatcher) s.withDispatcher(dispatcher)
case SupervisionStrategy(decider) s.withSupervisionStrategy(decider)
case _ s
}
}
}
currentPhase = phases(tag)(effectiveSettings, materializer)
val effectiveSettings = materializer.effectiveSettings(attributes)
currentPhase = phases(tag)(effectiveSettings, materializer, nextStageName())
if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
// Resolve the phase to be used to materialize this island
@ -293,8 +293,7 @@ class IslandTracking(
from = out,
toGlobalOffset = absoluteOffset,
logic,
currentPhase
)
currentPhase)
if (Debug) println(s" wiring is forward, recording $forwardWire")
forwardWires.add(forwardWire)
@ -310,8 +309,7 @@ case class PhasedFusingActorMaterializer(
dispatchers: Dispatchers,
supervisor: ActorRef,
haveShutDown: AtomicBoolean,
flowNames: SeqActorName
) extends ExtendedActorMaterializer {
flowNames: SeqActorName) extends ExtendedActorMaterializer {
import PhasedFusingActorMaterializer._
private val _logger = Logging.getLogger(system, this)
@ -382,8 +380,7 @@ case class PhasedFusingActorMaterializer(
subflowFuser,
initialAttributes,
PhasedFusingActorMaterializer.DefaultPhase,
PhasedFusingActorMaterializer.DefaultPhases
)
PhasedFusingActorMaterializer.DefaultPhases)
}
def materialize[Mat](
@ -391,9 +388,8 @@ case class PhasedFusingActorMaterializer(
subflowFuser: GraphInterpreterShell ActorRef,
initialAttributes: Attributes,
defaultPhase: Phase[Any],
phases: Map[IslandTag, Phase[Any]]
): Mat = {
val islandTracking = new IslandTracking(phases, settings, defaultPhase, this)
phases: Map[IslandTag, Phase[Any]]): Mat = {
val islandTracking = new IslandTracking(phases, settings, defaultPhase, this, islandNamePrefix = createFlowName() + "-")
var current: Traversal = graph.traversalBuilder.traversal
@ -493,7 +489,10 @@ case class PhasedFusingActorMaterializer(
trait IslandTag
trait Phase[M] {
def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[M]
def apply(
effectiveSettings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[M]
}
trait PhaseIsland[M] {
@ -517,9 +516,9 @@ trait PhaseIsland[M] {
object GraphStageTag extends IslandTag
final class GraphStageIsland(
settings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer
) extends PhaseIsland[GraphStageLogic] {
effectiveSettings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer,
islandName: String) extends PhaseIsland[GraphStageLogic] {
// TODO: remove these
private val logicArrayType = Array.empty[GraphStageLogic]
private[this] val logics = new ArrayList[GraphStageLogic](64)
@ -531,7 +530,7 @@ final class GraphStageIsland(
val shell = new GraphInterpreterShell(
connections = null,
logics = null,
settings,
effectiveSettings,
materializer)
override def name: String = "Fusing GraphStages phase"
@ -640,8 +639,8 @@ final class GraphStageIsland(
// subflowFuser(shell)
// } else {
val props = ActorGraphInterpreter.props(shell)
// TODO: actor names
materializer.actorOf(props, "fused" + Random.nextInt(), settings.dispatcher)
.withDispatcher(effectiveSettings.dispatcher)
materializer.actorOf(props, islandName)
// }
}
@ -651,12 +650,14 @@ final class GraphStageIsland(
object SourceModuleIslandTag extends IslandTag
final class SourceModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[Publisher[Any]] {
final class SourceModulePhase(
materializer: PhasedFusingActorMaterializer,
islandName: String) extends PhaseIsland[Publisher[Any]] {
override def name: String = s"SourceModule phase"
override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (Publisher[Any], Any) = {
// TODO: proper stage name
mod.asInstanceOf[SourceModule[Any, Any]].create(MaterializationContext(materializer, attributes, "stageName"))
mod.asInstanceOf[SourceModule[Any, Any]].create(MaterializationContext(materializer, attributes,
islandName + "-" + attributes.nameOrDefault()))
}
override def assignPort(in: InPort, slot: Int, logic: Publisher[Any]): Unit = ()
@ -673,14 +674,15 @@ final class SourceModulePhase(materializer: PhasedFusingActorMaterializer) exten
object SinkModuleIslandTag extends IslandTag
final class SinkModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[AnyRef] {
final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
extends PhaseIsland[AnyRef] {
override def name: String = s"SourceModule phase"
var subscriberOrVirtualPublisher: AnyRef = _
override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (AnyRef, Any) = {
// TODO: proper stage name
val subAndMat =
mod.asInstanceOf[SinkModule[Any, Any]].create(MaterializationContext(materializer, attributes, "stageName"))
mod.asInstanceOf[SinkModule[Any, Any]].create(MaterializationContext(materializer, attributes,
islandName + "-" + attributes.nameOrDefault()))
subscriberOrVirtualPublisher = subAndMat._1
(subscriberOrVirtualPublisher, subAndMat._2)
@ -706,7 +708,8 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer) extends
object ProcessorModuleIslandTag extends IslandTag
final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[Processor[Any, Any]] {
final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
extends PhaseIsland[Processor[Any, Any]] {
override def name: String = "ProcessorModulePhase"
private[this] var processor: Processor[Any, Any] = _
@ -723,4 +726,4 @@ final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer) ex
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor)
override def onIslandReady(): Unit = ()
}
}