diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index e73e2d3ced..35665c0d68 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -112,7 +112,7 @@ private final case class ForwardWire( override def toString: String = s"ForwardWire(islandId = $islandGlobalOffset, from = $from, toGlobal = $toGlobalOffset, phase = $phase)" } -private final case class SavedIslandData(islandGlobalOffset: Int, skippedSlots: Int, phase: PhaseIsland[Any]) +private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOffset: Int, skippedSlots: Int, phase: PhaseIsland[Any]) class IslandTracking( val phases: Map[IslandTag, Phase[Any]], @@ -147,7 +147,7 @@ class IslandTracking( def getCurrentPhase: PhaseIsland[Any] = currentPhase def getCurrentOffset: Int = currentGlobalOffset - private def completeSegment(): Int = { + private def completeSegment(): Unit = { val length = currentGlobalOffset - currentSegmentGlobalOffset if (activePhases eq null) { @@ -172,26 +172,25 @@ class IslandTracking( } else { if (Debug) println(s"Skipped zero length segment") } - - length } def enterIsland(tag: IslandTag, attributes: Attributes): Unit = { completeSegment() val previousPhase = currentPhase val previousIslandOffset = currentIslandGlobalOffset + islandStateStack.add(SavedIslandData(previousIslandOffset, currentGlobalOffset, currentIslandSkippetSlots, previousPhase)) val effectiveSettings = materializer.effectiveSettings(attributes) currentPhase = phases(tag)(effectiveSettings, materializer, nextIslandName()) activePhases.add(currentPhase) - if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") // Resolve the phase to be used to materialize this island currentIslandGlobalOffset = currentGlobalOffset // The base offset of this segment is the current global offset currentSegmentGlobalOffset = currentGlobalOffset - islandStateStack.add(SavedIslandData(previousIslandOffset, currentIslandSkippetSlots, previousPhase)) + currentIslandSkippetSlots = 0 + if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") } def exitIsland(): Unit = { @@ -204,7 +203,7 @@ class IslandTracking( // We restore data for the island currentIslandGlobalOffset = parentIsland.islandGlobalOffset currentPhase = parentIsland.phase - currentIslandSkippetSlots = parentIsland.skippedSlots + previousSegmentLength + currentIslandSkippetSlots = parentIsland.skippedSlots + (currentGlobalOffset - parentIsland.lastVisitedOffset) if (Debug) println(s"Exited to island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") } @@ -253,7 +252,6 @@ class IslandTracking( } def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = { - // TODO: forward wires if (Debug) println(s" wiring $out to absolute = $absoluteOffset") // First check if we are wiring backwards. This is important since we can only do resolution for backward wires. @@ -760,10 +758,10 @@ final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: Ph var publishers: Vector[ActorPublisher[Any]] = _ def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { - println(mod) val tls = mod.asInstanceOf[TlsModule] - val props = TLSActor.props(settings, tls.createSSLEngine, tls.verifySession, tls.closing) + val props = + TLSActor.props(settings, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(settings.dispatcher) tlsActor = materializer.actorOf(props, islandName) def factory(id: Int) = new ActorPublisher[Any](tlsActor) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) @@ -779,7 +777,7 @@ final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: Ph publishers(out.id) def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = - publisher.subscribe(FanIn.SubInput[Any](tlsActor, slot)) + publisher.subscribe(FanIn.SubInput[Any](tlsActor, 1 - slot)) def onIslandReady(): Unit = () } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 66b7abbaee..6288c352a6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -264,8 +264,11 @@ sealed trait TraversalBuilder { protected def internalSetAttributes(attributes: Attributes): TraversalBuilder def setAttributes(attributes: Attributes): TraversalBuilder = { - if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) - else internalSetAttributes(attributes) + if (attributes ne Attributes.none) { + if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) + else internalSetAttributes(attributes) + } else + this } def attributes: Attributes @@ -544,7 +547,8 @@ object LinearTraversalBuilder { addMatCompose(PushNotUsed, combine), pendingBuilder = Some(composite), Attributes.none, - beforeBuilder = if (inOpt.isDefined) PushAttributes(composite.attributes) else EmptyTraversal) + beforeBuilder = + if (inOpt.isDefined && (composite.attributes ne Attributes.none)) PushAttributes(composite.attributes) else EmptyTraversal) } } @@ -585,8 +589,11 @@ final case class LinearTraversalBuilder( copy(attributes = attributes) override def setAttributes(attributes: Attributes): LinearTraversalBuilder = { - if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) - else internalSetAttributes(attributes) + if (attributes ne Attributes.none) { + if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) + else internalSetAttributes(attributes) + } else + this } private def applyIslandAndAttributes(t: Traversal): Traversal = { @@ -628,7 +635,7 @@ final case class LinearTraversalBuilder( .assign(out, inOffset - composite.offsetOfModule(out)) .traversal .concat(traversalSoFar), - pendingBuilder = None) + pendingBuilder = None, beforeBuilder = EmptyTraversal) case None ⇒ copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } @@ -666,7 +673,9 @@ final case class LinearTraversalBuilder( .assign(out, relativeSlot) .traversal .concat(traversalSoFar), - pendingBuilder = None) + pendingBuilder = None, + beforeBuilder = EmptyTraversal + ) case None ⇒ copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } @@ -720,15 +729,21 @@ final case class LinearTraversalBuilder( toAppend.traversal .concat(LinearTraversalBuilder.addMatCompose(traversalWithWiringCorrected, matCompose)) else { + val withAttributes = + if (toAppend.attributes ne Attributes.none) { + PopAttributes + .concat(LinearTraversalBuilder.addMatCompose(traversalWithWiringCorrected, matCompose)) + } else + LinearTraversalBuilder.addMatCompose(traversalWithWiringCorrected, matCompose) + if (toAppend.islandTag.isEmpty) { toAppend.traversalSoFar - .concat(PopAttributes) - .concat(LinearTraversalBuilder.addMatCompose(traversalWithWiringCorrected, matCompose)) + .concat(withAttributes) + } else { toAppend.traversalSoFar .concat(ExitIsland) - .concat(PopAttributes) - .concat(LinearTraversalBuilder.addMatCompose(traversalWithWiringCorrected, matCompose)) + .concat(withAttributes) } } @@ -736,10 +751,12 @@ final case class LinearTraversalBuilder( if (toAppend.pendingBuilder.isEmpty) { EmptyTraversal } else { - toAppend.islandTag match { - case None ⇒ PushAttributes(toAppend.attributes) - case Some(tag) ⇒ PushAttributes(toAppend.attributes).concat(EnterIsland(tag)) - } + if (toAppend.attributes ne Attributes.none) { + toAppend.islandTag match { + case None ⇒ PushAttributes(toAppend.attributes) + case Some(tag) ⇒ PushAttributes(toAppend.attributes).concat(EnterIsland(tag)) + } + } else EmptyTraversal } @@ -753,7 +770,8 @@ final case class LinearTraversalBuilder( traversalSoFar = newTraversal, pendingBuilder = toAppend.pendingBuilder, attributes = Attributes.none, - beforeBuilder = newBeforeBuilder + beforeBuilder = newBeforeBuilder.concat(toAppend.beforeBuilder), + islandTag = None ) } else throw new Exception("should this happen?") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 9d740bbd9f..97e72f7bea 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -56,13 +56,21 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat]( * flow into the materialized value of the resulting BidiFlow. */ def atopMat[OO1, II2, Mat2, M](bidi: Graph[BidiShape[O1, OO1, II2, I2], Mat2])(combine: (Mat, Mat2) ⇒ M): BidiFlow[I1, OO1, II2, O2, M] = { - val newBidiShape = bidi.shape.deepCopy() + val newBidi1Shape = shape.deepCopy() + val newBidi2Shape = bidi.shape.deepCopy() + + // We MUST add the current module as an explicit submodule. The composite builder otherwise *grows* the + // existing module, which is not good if there are islands present (the new module will "join" the island). + val newTraversalBuilder = + TraversalBuilder.empty() + .add(traversalBuilder, newBidi1Shape, Keep.right) + .add(bidi.traversalBuilder, newBidi2Shape, combine) + .wire(newBidi1Shape.out1, newBidi2Shape.in1) + .wire(newBidi2Shape.out2, newBidi1Shape.in2) new BidiFlow( - traversalBuilder.add(bidi.traversalBuilder, newBidiShape, combine) - .wire(shape.out1, newBidiShape.in1) - .wire(newBidiShape.out2, shape.in2), - BidiShape(shape.in1, newBidiShape.out1, newBidiShape.in2, shape.out2) + newTraversalBuilder, + BidiShape(newBidi1Shape.in1, newBidi2Shape.out1, newBidi2Shape.in2, newBidi1Shape.out2) ) } @@ -106,14 +114,18 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat]( * flow into the materialized value of the resulting [[Flow]]. */ def joinMat[Mat2, M](flow: Graph[FlowShape[O1, I2], Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I1, O2, M] = { + val newBidiShape = shape.deepCopy() val newFlowShape = flow.shape.deepCopy() - val resultBuilder = traversalBuilder + // We MUST add the current module as an explicit submodule. The composite builder otherwise *grows* the + // existing module, which is not good if there are islands present (the new module will "join" the island). + val resultBuilder = TraversalBuilder.empty() + .add(traversalBuilder, newBidiShape, Keep.right) .add(flow.traversalBuilder, newFlowShape, combine) - .wire(shape.out1, newFlowShape.in) - .wire(newFlowShape.out, shape.in2) + .wire(newBidiShape.out1, newFlowShape.in) + .wire(newFlowShape.out, newBidiShape.in2) - val newShape = FlowShape(shape.in1, shape.out2) + val newShape = FlowShape(newBidiShape.in1, newBidiShape.out2) new Flow( LinearTraversalBuilder.fromBuilder(resultBuilder, newShape, Keep.right),