Wip fix islands drewhk (#22475)
* Fixed island application * Switched to explicit island boundaries and fixed misapplied island tags * Added missing attributes for non-stage backed Sinks
This commit is contained in:
parent
37610e43bb
commit
87d5b8f627
8 changed files with 301 additions and 66 deletions
|
|
@ -84,7 +84,7 @@ object PhasedFusingActorMaterializer {
|
|||
|
||||
}
|
||||
|
||||
private case class SegmentInfo(
|
||||
private final case class SegmentInfo(
|
||||
globalislandOffset: Int, // The island to which the segment belongs
|
||||
length: Int, // How many slots are contained by the segment
|
||||
globalBaseOffset: Int, // The global slot where this segment starts
|
||||
|
|
@ -102,7 +102,7 @@ private case class SegmentInfo(
|
|||
""".stripMargin
|
||||
}
|
||||
|
||||
private case class ForwardWire(
|
||||
private final case class ForwardWire(
|
||||
islandGlobalOffset: Int,
|
||||
from: OutPort,
|
||||
toGlobalOffset: Int,
|
||||
|
|
@ -112,6 +112,8 @@ private case class ForwardWire(
|
|||
override def toString: String = s"ForwardWire(islandId = $islandGlobalOffset, from = $from, toGlobal = $toGlobalOffset, phase = $phase)"
|
||||
}
|
||||
|
||||
private final case class SavedIslandData(islandGlobalOffset: Int, skippedSlots: Int, phase: PhaseIsland[Any])
|
||||
|
||||
class IslandTracking(
|
||||
val phases: Map[IslandTag, Phase[Any]],
|
||||
val settings: ActorMaterializerSettings,
|
||||
|
|
@ -138,6 +140,7 @@ class IslandTracking(
|
|||
private var segments: java.util.ArrayList[SegmentInfo] = null
|
||||
private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = null
|
||||
private var forwardWires: java.util.ArrayList[ForwardWire] = null
|
||||
private var islandStateStack: java.util.ArrayList[SavedIslandData] = null
|
||||
|
||||
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextIslandName())
|
||||
|
||||
|
|
@ -147,7 +150,10 @@ class IslandTracking(
|
|||
private def completeSegment(): Int = {
|
||||
val length = currentGlobalOffset - currentSegmentGlobalOffset
|
||||
|
||||
if (activePhases eq null) activePhases = new util.ArrayList[PhaseIsland[Any]](8)
|
||||
if (activePhases eq null) {
|
||||
activePhases = new util.ArrayList[PhaseIsland[Any]](8)
|
||||
islandStateStack = new java.util.ArrayList[SavedIslandData](4)
|
||||
}
|
||||
|
||||
if (length > 0) {
|
||||
// We just finished a segment by entering an island.
|
||||
|
|
@ -170,7 +176,7 @@ class IslandTracking(
|
|||
length
|
||||
}
|
||||
|
||||
def enterIsland(tag: IslandTag, attributes: Attributes): ExitIsland = {
|
||||
def enterIsland(tag: IslandTag, attributes: Attributes): Unit = {
|
||||
completeSegment()
|
||||
val previousPhase = currentPhase
|
||||
val previousIslandOffset = currentIslandGlobalOffset
|
||||
|
|
@ -185,19 +191,20 @@ class IslandTracking(
|
|||
|
||||
// The base offset of this segment is the current global offset
|
||||
currentSegmentGlobalOffset = currentGlobalOffset
|
||||
ExitIsland(previousIslandOffset, currentIslandSkippetSlots, previousPhase)
|
||||
islandStateStack.add(SavedIslandData(previousIslandOffset, currentIslandSkippetSlots, previousPhase))
|
||||
}
|
||||
|
||||
def exitIsland(exitIsland: ExitIsland): Unit = {
|
||||
def exitIsland(): Unit = {
|
||||
val parentIsland = islandStateStack.remove(islandStateStack.size() - 1)
|
||||
val previousSegmentLength = completeSegment()
|
||||
|
||||
// We start a new segment
|
||||
currentSegmentGlobalOffset = currentGlobalOffset
|
||||
|
||||
// We restore data for the island
|
||||
currentIslandGlobalOffset = exitIsland.islandGlobalOffset
|
||||
currentPhase = exitIsland.phase
|
||||
currentIslandSkippetSlots = exitIsland.skippedSlots + previousSegmentLength
|
||||
currentIslandGlobalOffset = parentIsland.islandGlobalOffset
|
||||
currentPhase = parentIsland.phase
|
||||
currentIslandSkippetSlots = parentIsland.skippedSlots + previousSegmentLength
|
||||
|
||||
if (Debug) println(s"Exited to island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
|
||||
}
|
||||
|
|
@ -410,7 +417,7 @@ case class PhasedFusingActorMaterializer(
|
|||
var current: Traversal = graph.traversalBuilder.traversal
|
||||
|
||||
val attributesStack = new java.util.ArrayDeque[Attributes](8)
|
||||
attributesStack.addLast(initialAttributes)
|
||||
attributesStack.addLast(initialAttributes and graph.traversalBuilder.attributes)
|
||||
|
||||
// TODO: No longer need for a stack
|
||||
val traversalStack = new java.util.ArrayDeque[Traversal](16)
|
||||
|
|
@ -483,11 +490,10 @@ case class PhasedFusingActorMaterializer(
|
|||
case PopAttributes ⇒
|
||||
attributesStack.removeLast()
|
||||
if (Debug) println(s"ATTR POP")
|
||||
case EnterIsland(tag, island) ⇒
|
||||
traversalStack.addLast(islandTracking.enterIsland(tag, attributesStack.getLast))
|
||||
nextStep = island
|
||||
case ex: ExitIsland ⇒
|
||||
islandTracking.exitIsland(ex)
|
||||
case EnterIsland(tag) ⇒
|
||||
islandTracking.enterIsland(tag, attributesStack.getLast)
|
||||
case ExitIsland ⇒
|
||||
islandTracking.exitIsland()
|
||||
case _ ⇒
|
||||
}
|
||||
current = nextStep
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue