Fix too early call to onIslandReady (#22468)
This commit is contained in:
parent
f6d45c1bc1
commit
c7c109db4a
1 changed files with 16 additions and 3 deletions
|
|
@ -1,5 +1,6 @@
|
|||
package akka.stream.impl
|
||||
|
||||
import java.util
|
||||
import java.util.ArrayList
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
|
|
@ -131,6 +132,7 @@ class IslandTracking(
|
|||
private var currentIslandSkippetSlots = 0
|
||||
|
||||
private var segments: java.util.ArrayList[SegmentInfo] = null
|
||||
private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = null
|
||||
private var forwardWires: java.util.ArrayList[ForwardWire] = null
|
||||
|
||||
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextStageName())
|
||||
|
|
@ -141,6 +143,8 @@ class IslandTracking(
|
|||
private def completeSegment(): Int = {
|
||||
val length = currentGlobalOffset - currentSegmentGlobalOffset
|
||||
|
||||
if (activePhases eq null) activePhases = new util.ArrayList[PhaseIsland[Any]](8)
|
||||
|
||||
if (length > 0) {
|
||||
// We just finished a segment by entering an island.
|
||||
val previousSegment = SegmentInfo(
|
||||
|
|
@ -169,6 +173,7 @@ class IslandTracking(
|
|||
|
||||
val effectiveSettings = materializer.effectiveSettings(attributes)
|
||||
currentPhase = phases(tag)(effectiveSettings, materializer, nextStageName())
|
||||
activePhases.add(currentPhase)
|
||||
if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
|
||||
|
||||
// Resolve the phase to be used to materialize this island
|
||||
|
|
@ -182,9 +187,6 @@ class IslandTracking(
|
|||
def exitIsland(exitIsland: ExitIsland): Unit = {
|
||||
val previousSegmentLength = completeSegment()
|
||||
|
||||
// Closing previous island
|
||||
currentPhase.onIslandReady()
|
||||
|
||||
// We start a new segment
|
||||
currentSegmentGlobalOffset = currentGlobalOffset
|
||||
|
||||
|
|
@ -301,6 +303,16 @@ class IslandTracking(
|
|||
|
||||
}
|
||||
|
||||
def allNestedIslandsReady(): Unit = {
|
||||
if (activePhases ne null) {
|
||||
var i = 0
|
||||
while (i < activePhases.size()) {
|
||||
activePhases.get(i).onIslandReady()
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case class PhasedFusingActorMaterializer(
|
||||
|
|
@ -479,6 +491,7 @@ case class PhasedFusingActorMaterializer(
|
|||
}
|
||||
|
||||
islandTracking.getCurrentPhase.onIslandReady()
|
||||
islandTracking.allNestedIslandsReady()
|
||||
|
||||
if (Debug) println("--- Finished materialization")
|
||||
matValueStack.peekLast().asInstanceOf[Mat]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue