=str - Minor touchups to GraphInterpreter and GraphAssembly

This commit is contained in:
Viktor Klang 2015-10-26 10:48:22 +01:00
parent 630bd948d5
commit b8c3a5c664
4 changed files with 77 additions and 49 deletions

View file

@ -3,11 +3,13 @@
*/
package akka.stream.impl.fusing
import java.util.Arrays
import akka.event.LoggingAdapter
import akka.io.Tcp.Closed
import akka.stream.stage._
import akka.stream.{ Materializer, Shape, Inlet, Outlet }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
/**
@ -85,13 +87,14 @@ private[stream] object GraphInterpreter {
* corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]].
*
*/
final case class GraphAssembly(stages: Array[GraphStageWithMaterializedValue[_, _]],
ins: Array[Inlet[_]],
inOwners: Array[Int],
outs: Array[Outlet[_]],
outOwners: Array[Int]) {
final class GraphAssembly(val stages: Array[GraphStageWithMaterializedValue[_, _]],
val ins: Array[Inlet[_]],
val inOwners: Array[Int],
val outs: Array[Outlet[_]],
val outOwners: Array[Int]) {
require(ins.length == inOwners.length && inOwners.length == outs.length && outs.length == outOwners.length)
val connectionCount: Int = ins.length
def connectionCount: Int = ins.length
/**
* Takes an interpreter and returns three arrays required by the interpreter containing the input, output port
@ -175,6 +178,42 @@ private[stream] object GraphInterpreter {
outOwners.mkString("[", ",", "]") +
")"
}
object GraphAssembly {
/**
* INTERNAL API
*/
final def apply(inlets: immutable.Seq[Inlet[_]],
outlets: immutable.Seq[Outlet[_]],
stages: GraphStageWithMaterializedValue[_, _]*): GraphAssembly = {
// add the contents of an iterator to an array starting at idx
@tailrec def add[T](i: Iterator[T], a: Array[T], idx: Int): Array[T] =
if (i.hasNext) {
a(idx) = i.next()
add(i, a, idx + 1)
} else a
// fill array slots with Boundary
def markBoundary(owners: Array[Int], from: Int, to: Int): Array[Int] = {
Arrays.fill(owners, from, to, Boundary)
owners
}
val inletsSize = inlets.size
val outletsSize = outlets.size
val connectionCount = inletsSize + outletsSize
require(connectionCount > 0, s"sum of inlets ({$inletsSize}) & outlets ({$outletsSize}) must be > 0")
val assembly = new GraphAssembly(
stages.toArray,
add(inlets.iterator, Array.ofDim(connectionCount), 0),
markBoundary(Array.ofDim(connectionCount), inletsSize, connectionCount),
add(outlets.iterator, Array.ofDim(connectionCount), inletsSize),
markBoundary(Array.ofDim(connectionCount), 0, inletsSize))
assembly
}
}
}
/**
@ -367,13 +406,17 @@ private[stream] final class GraphInterpreter(
// Debug name for a connections input part
private def inOwnerName(connection: Int): String =
if (assembly.inOwners(connection) == Boundary) "DownstreamBoundary"
else assembly.stages(assembly.inOwners(connection)).toString
assembly.inOwners(connection) match {
case Boundary "DownstreamBoundary"
case owner assembly.stages(owner).toString
}
// Debug name for a connections ouput part
private def outOwnerName(connection: Int): String =
if (assembly.outOwners(connection) == Boundary) "UpstreamBoundary"
else assembly.stages(assembly.outOwners(connection)).toString
assembly.outOwners(connection) match {
case Boundary "UpstreamBoundary"
case owner assembly.stages(owner).toString
}
/**
* Executes pending events until the given limit is met. If there were remaining events, isSuspended will return
@ -512,28 +555,28 @@ private[stream] final class GraphInterpreter(
}
private[stream] def pull(connection: Int): Unit = {
if ((portStates(connection) & OutClosed) == 0) {
portStates(connection) ^= PullStartFlip
val currentState = portStates(connection)
if ((currentState & OutClosed) == 0) {
portStates(connection) = currentState ^ PullStartFlip
enqueue(connection)
}
}
private[stream] def complete(connection: Int): Unit = {
val currentState = portStates(connection)
portStates(connection) = portStates(connection) | OutClosed
if ((currentState & InClosed) == 0) {
if ((currentState & Pushing) != 0) {} // FIXME: Fold into previous condition
else if (connectionSlots(connection) != Empty)
enqueue(connection)
else
enqueue(connection)
portStates(connection) = currentState | OutClosed
if ((currentState & (InClosed | Pushing)) == 0) {
enqueue(connection)
}
completeConnection(assembly.outOwners(connection))
}
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
portStates(connection) |= (OutClosed | InFailed)
if ((portStates(connection) & InClosed) == 0) {
val currentState = portStates(connection)
portStates(connection) = currentState | (OutClosed | InFailed)
if ((currentState & InClosed) == 0) {
connectionSlots(connection) = Failed(ex, connectionSlots(connection))
enqueue(connection)
}
@ -542,8 +585,9 @@ private[stream] final class GraphInterpreter(
}
private[stream] def cancel(connection: Int): Unit = {
portStates(connection) |= InClosed
if ((portStates(connection) & OutClosed) == 0) {
val currentState = portStates(connection)
portStates(connection) = currentState | InClosed
if ((currentState & OutClosed) == 0) {
connectionSlots(connection) = Empty
enqueue(connection)
}