diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index fed8396499..ed0184b37c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -43,7 +43,7 @@ class GraphInterpreterSpec extends GraphInterpreterSpecKit { val sink = new DownstreamProbe[Int]("sink") // Constructing an assembly by hand and resolving ambiguities - val assembly = GraphAssembly( + val assembly = new GraphAssembly( stages = Array(identity, identity), ins = Array(identity.in, identity.in, null), inOwners = Array(0, 1, -1), diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index 273583ba91..e89666735b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -60,7 +60,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { val inOwners = ins.map { in ⇒ stages.indexWhere(_.shape.inlets.contains(in)) } val outOwners = outs.map { out ⇒ stages.indexWhere(_.shape.outlets.contains(out)) } - val assembly = GraphAssembly( + val assembly = new GraphAssembly( stages.toArray, (ins ++ Vector.fill(downstreams.size)(null)).toArray, (inOwners ++ Vector.fill(downstreams.size)(-1)).toArray, @@ -171,7 +171,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { }) } - private val assembly = GraphAssembly( + private val assembly = new GraphAssembly( stages = Array.empty, ins = Array(null), inOwners = Array(-1), diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index dbe7b639a9..e87e093818 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -3,11 +3,13 @@ */ package akka.stream.impl.fusing +import java.util.Arrays + import akka.event.LoggingAdapter -import akka.io.Tcp.Closed import akka.stream.stage._ import akka.stream.{ Materializer, Shape, Inlet, Outlet } - +import scala.annotation.tailrec +import scala.collection.immutable import scala.util.control.NonFatal /** @@ -85,13 +87,14 @@ private[stream] object GraphInterpreter { * corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]]. * */ - final case class GraphAssembly(stages: Array[GraphStageWithMaterializedValue[_, _]], - ins: Array[Inlet[_]], - inOwners: Array[Int], - outs: Array[Outlet[_]], - outOwners: Array[Int]) { + final class GraphAssembly(val stages: Array[GraphStageWithMaterializedValue[_, _]], + val ins: Array[Inlet[_]], + val inOwners: Array[Int], + val outs: Array[Outlet[_]], + val outOwners: Array[Int]) { + require(ins.length == inOwners.length && inOwners.length == outs.length && outs.length == outOwners.length) - val connectionCount: Int = ins.length + def connectionCount: Int = ins.length /** * Takes an interpreter and returns three arrays required by the interpreter containing the input, output port @@ -175,6 +178,42 @@ private[stream] object GraphInterpreter { outOwners.mkString("[", ",", "]") + ")" } + + object GraphAssembly { + /** + * INTERNAL API + */ + final def apply(inlets: immutable.Seq[Inlet[_]], + outlets: immutable.Seq[Outlet[_]], + stages: GraphStageWithMaterializedValue[_, _]*): GraphAssembly = { + // add the contents of an iterator to an array starting at idx + @tailrec def add[T](i: Iterator[T], a: Array[T], idx: Int): Array[T] = + if (i.hasNext) { + a(idx) = i.next() + add(i, a, idx + 1) + } else a + + // fill array slots with Boundary + def markBoundary(owners: Array[Int], from: Int, to: Int): Array[Int] = { + Arrays.fill(owners, from, to, Boundary) + owners + } + + val inletsSize = inlets.size + val outletsSize = outlets.size + val connectionCount = inletsSize + outletsSize + require(connectionCount > 0, s"sum of inlets ({$inletsSize}) & outlets ({$outletsSize}) must be > 0") + + val assembly = new GraphAssembly( + stages.toArray, + add(inlets.iterator, Array.ofDim(connectionCount), 0), + markBoundary(Array.ofDim(connectionCount), inletsSize, connectionCount), + add(outlets.iterator, Array.ofDim(connectionCount), inletsSize), + markBoundary(Array.ofDim(connectionCount), 0, inletsSize)) + + assembly + } + } } /** @@ -367,13 +406,17 @@ private[stream] final class GraphInterpreter( // Debug name for a connections input part private def inOwnerName(connection: Int): String = - if (assembly.inOwners(connection) == Boundary) "DownstreamBoundary" - else assembly.stages(assembly.inOwners(connection)).toString + assembly.inOwners(connection) match { + case Boundary ⇒ "DownstreamBoundary" + case owner ⇒ assembly.stages(owner).toString + } // Debug name for a connections ouput part private def outOwnerName(connection: Int): String = - if (assembly.outOwners(connection) == Boundary) "UpstreamBoundary" - else assembly.stages(assembly.outOwners(connection)).toString + assembly.outOwners(connection) match { + case Boundary ⇒ "UpstreamBoundary" + case owner ⇒ assembly.stages(owner).toString + } /** * Executes pending events until the given limit is met. If there were remaining events, isSuspended will return @@ -512,28 +555,28 @@ private[stream] final class GraphInterpreter( } private[stream] def pull(connection: Int): Unit = { - if ((portStates(connection) & OutClosed) == 0) { - portStates(connection) ^= PullStartFlip + val currentState = portStates(connection) + if ((currentState & OutClosed) == 0) { + portStates(connection) = currentState ^ PullStartFlip enqueue(connection) } } private[stream] def complete(connection: Int): Unit = { val currentState = portStates(connection) - portStates(connection) = portStates(connection) | OutClosed - if ((currentState & InClosed) == 0) { - if ((currentState & Pushing) != 0) {} // FIXME: Fold into previous condition - else if (connectionSlots(connection) != Empty) - enqueue(connection) - else - enqueue(connection) + portStates(connection) = currentState | OutClosed + + if ((currentState & (InClosed | Pushing)) == 0) { + enqueue(connection) } + completeConnection(assembly.outOwners(connection)) } private[stream] def fail(connection: Int, ex: Throwable): Unit = { - portStates(connection) |= (OutClosed | InFailed) - if ((portStates(connection) & InClosed) == 0) { + val currentState = portStates(connection) + portStates(connection) = currentState | (OutClosed | InFailed) + if ((currentState & InClosed) == 0) { connectionSlots(connection) = Failed(ex, connectionSlots(connection)) enqueue(connection) } @@ -542,8 +585,9 @@ private[stream] final class GraphInterpreter( } private[stream] def cancel(connection: Int): Unit = { - portStates(connection) |= InClosed - if ((portStates(connection) & OutClosed) == 0) { + val currentState = portStates(connection) + portStates(connection) = currentState | InClosed + if ((currentState & OutClosed) == 0) { connectionSlots(connection) = Empty enqueue(connection) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 10b8873200..fdc3bc5e7c 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -16,27 +16,11 @@ abstract class GraphStageWithMaterializedValue[S <: Shape, M] extends Graph[S, M def shape: S def createLogicAndMaterializedValue: (GraphStageLogic, M) - final override private[stream] lazy val module: Module = { - val connectionCount = shape.inlets.size + shape.outlets.size - val assembly = GraphAssembly( - Array(this), - Array.ofDim(connectionCount), - Array.fill(connectionCount)(-1), - Array.ofDim(connectionCount), - Array.fill(connectionCount)(-1)) - - for ((inlet, i) ← shape.inlets.iterator.zipWithIndex) { - assembly.ins(i) = inlet - assembly.inOwners(i) = 0 - } - - for ((outlet, i) ← shape.outlets.iterator.zipWithIndex) { - assembly.outs(i + shape.inlets.size) = outlet - assembly.outOwners(i + shape.inlets.size) = 0 - } - - GraphModule(assembly, shape, Attributes.none) - } + final override private[stream] lazy val module: Module = + GraphModule( + GraphAssembly(shape.inlets, shape.outlets, Array(this): _*), + shape, + Attributes.none) /** * This method throws an [[UnsupportedOperationException]] by default. The subclass can override this method