+str: fusable fan-in and fan-out ops

This commit is contained in:
Endre Sándor Varga 2015-09-18 17:17:25 +02:00
parent cc661409f9
commit 26680c3477
18 changed files with 456 additions and 629 deletions

View file

@ -3,16 +3,15 @@
*/
package akka.stream.scaladsl
import akka.stream.impl.Junctions._
import akka.stream.impl.GenJunctions._
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl._
import akka.stream.impl.StreamLayout._
import akka.stream._
import Attributes.name
import scala.collection.immutable
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
import scala.annotation.unchecked.uncheckedVariance
import scala.annotation.tailrec
import scala.collection.immutable
object Merge {
/**
@ -20,10 +19,7 @@ object Merge {
*
* @param inputPorts number of input ports
*/
def apply[T](inputPorts: Int): Merge[T] = {
val shape = new UniformFanInShape[T, T](inputPorts)
new Merge(inputPorts, shape, new MergeModule(shape, Attributes.name("Merge")))
}
def apply[T](inputPorts: Int): Merge[T] = new Merge(inputPorts)
}
@ -39,15 +35,68 @@ object Merge {
*
* '''Cancels when''' downstream cancels
*/
class Merge[T] private (inputPorts: Int,
override val shape: UniformFanInShape[T, T],
private[stream] override val module: StreamLayout.Module)
extends Graph[UniformFanInShape[T, T], Unit] {
class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def withAttributes(attr: Attributes): Merge[T] =
new Merge(inputPorts, shape, module.withAttributes(attr).nest())
override def createLogic: GraphStageLogic = new GraphStageLogic {
private var initialized = false
override def named(name: String): Merge[T] = withAttributes(Attributes.name(name))
private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts)
private var pendingHead = 0
private var pendingTail = 0
private var runningUpstreams = inputPorts
private def upstreamsClosed = runningUpstreams == 0
private def pending: Boolean = pendingHead != pendingTail
private def enqueue(in: Inlet[T]): Unit = {
pendingQueue(pendingTail % inputPorts) = in
pendingTail += 1
}
private def dequeueAndDispatch(): Unit = {
val in = pendingQueue(pendingHead % inputPorts)
pendingHead += 1
push(out, grab(in))
if (upstreamsClosed && !pending) completeStage()
else tryPull(in)
}
private def tryPull(in: Inlet[T]): Unit = if (!isClosed(in)) pull(in)
in.foreach { i
setHandler(i, new InHandler {
override def onPush(): Unit = {
if (isAvailable(out)) {
if (!pending) {
push(out, grab(i))
tryPull(i)
}
} else enqueue(i)
}
override def onUpstreamFinish() = {
runningUpstreams -= 1
if (upstreamsClosed && !pending) completeStage()
}
})
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!initialized) {
initialized = true
in.foreach(tryPull)
} else if (pending)
dequeueAndDispatch()
}
})
}
override def toString = "Merge"
}
object MergePreferred {
@ -65,10 +114,7 @@ object MergePreferred {
*
* @param secondaryPorts number of secondary input ports
*/
def apply[T](secondaryPorts: Int): MergePreferred[T] = {
val shape = new MergePreferredShape[T](secondaryPorts, "MergePreferred")
new MergePreferred(secondaryPorts, shape, new MergePreferredModule(shape, Attributes.name("MergePreferred")))
}
def apply[T](secondaryPorts: Int): MergePreferred[T] = new MergePreferred(secondaryPorts)
}
/**
@ -88,15 +134,91 @@ object MergePreferred {
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
class MergePreferred[T] private (secondaryPorts: Int,
override val shape: MergePreferred.MergePreferredShape[T],
private[stream] override val module: StreamLayout.Module)
extends Graph[MergePreferred.MergePreferredShape[T], Unit] {
class MergePreferred[T] private (secondaryPorts: Int) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
override val shape: MergePreferred.MergePreferredShape[T] =
new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred")
override def withAttributes(attr: Attributes): MergePreferred[T] =
new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).nest())
def in(id: Int): Inlet[T] = shape.in(id)
def out: Outlet[T] = shape.out
def preferred: Inlet[T] = shape.preferred
override def named(name: String): MergePreferred[T] = withAttributes(Attributes.name(name))
// FIXME: Factor out common stuff with Merge
override def createLogic: GraphStageLogic = new GraphStageLogic {
private var initialized = false
private val pendingQueue = Array.ofDim[Inlet[T]](secondaryPorts)
private var pendingHead = 0
private var pendingTail = 0
private var runningUpstreams = secondaryPorts + 1
private def upstreamsClosed = runningUpstreams == 0
private def pending: Boolean = pendingHead != pendingTail
private def priority: Boolean = isAvailable(preferred)
private def enqueue(in: Inlet[T]): Unit = {
pendingQueue(pendingTail % secondaryPorts) = in
pendingTail += 1
}
private def dequeueAndDispatch(): Unit = {
val in = pendingQueue(pendingHead % secondaryPorts)
pendingHead += 1
push(out, grab(in))
if (upstreamsClosed && !pending && !priority) completeStage()
else tryPull(in)
}
private def tryPull(in: Inlet[T]): Unit = if (!isClosed(in)) pull(in)
// FIXME: slow iteration, try to make in a vector and inject into shape instead
(0 until secondaryPorts).map(in).foreach { i
setHandler(i, new InHandler {
override def onPush(): Unit = {
if (isAvailable(out)) {
if (!pending) {
push(out, grab(i))
tryPull(i)
}
} else enqueue(i)
}
override def onUpstreamFinish() = {
runningUpstreams -= 1
if (upstreamsClosed && !pending && !priority) completeStage()
}
})
}
setHandler(preferred, new InHandler {
override def onPush() = {
if (isAvailable(out)) {
push(out, grab(preferred))
tryPull(preferred)
}
}
override def onUpstreamFinish() = {
runningUpstreams -= 1
if (upstreamsClosed && !pending && !priority) completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!initialized) {
initialized = true
// FIXME: slow iteration, try to make in a vector and inject into shape instead
tryPull(preferred)
(0 until secondaryPorts).map(in).foreach(tryPull)
} else if (priority) {
push(out, grab(preferred))
tryPull(preferred)
} else if (pending)
dequeueAndDispatch()
}
})
}
}
object Broadcast {
@ -107,8 +229,7 @@ object Broadcast {
* @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel.
*/
def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = {
val shape = new UniformFanOutShape[T, T](outputPorts)
new Broadcast(outputPorts, shape, new BroadcastModule(shape, eagerCancel, Attributes.name("Broadcast")))
new Broadcast(outputPorts, eagerCancel)
}
}
@ -126,15 +247,73 @@ object Broadcast {
* If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*
*/
class Broadcast[T] private (outputPorts: Int,
override val shape: UniformFanOutShape[T, T],
private[stream] override val module: StreamLayout.Module)
extends Graph[UniformFanOutShape[T, T], Unit] {
class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
val in: Inlet[T] = Inlet[T]("Broadast.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Broadcast.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
override def withAttributes(attr: Attributes): Broadcast[T] =
new Broadcast(outputPorts, shape, module.withAttributes(attr).nest())
override def createLogic: GraphStageLogic = new GraphStageLogic {
private var pendingCount = outputPorts
private val pending = Array.fill[Boolean](outputPorts)(true)
private var downstreamsRunning = outputPorts
setHandler(in, new InHandler {
override def onPush(): Unit = {
pendingCount = downstreamsRunning
val elem = grab(in)
var idx = 0
val itr = out.iterator
while (itr.hasNext) {
val o = itr.next()
val i = idx
if (!isClosed(o)) {
push(o, elem)
pending(i) = true
}
idx += 1
}
}
})
private def tryPull(): Unit =
if (pendingCount == 0 && !hasBeenPulled(in)) pull(in)
{
var idx = 0
val itr = out.iterator
while (itr.hasNext) {
val out = itr.next()
val i = idx
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pending(i) = false
pendingCount -= 1
tryPull()
}
override def onDownstreamFinish() = {
if (eagerCancel) completeStage()
else {
downstreamsRunning -= 1
if (downstreamsRunning == 0) completeStage()
else if (pending(i)) {
pending(i) = false
pendingCount -= 1
tryPull()
}
}
}
})
idx += 1
}
}
}
override def toString = "Broadcast"
override def named(name: String): Broadcast[T] = withAttributes(Attributes.name(name))
}
object Balance {
@ -147,9 +326,7 @@ object Balance {
* default value is `false`
*/
def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = {
val shape = new UniformFanOutShape[T, T](outputPorts)
new Balance(outputPorts, waitForAllDownstreams, shape,
new BalanceModule(shape, waitForAllDownstreams, Attributes.name("Balance")))
new Balance(outputPorts, waitForAllDownstreams)
}
}
@ -168,26 +345,77 @@ object Balance {
*
* '''Cancels when''' all downstreams cancel
*/
class Balance[T] private (outputPorts: Int,
waitForAllDownstreams: Boolean,
override val shape: UniformFanOutShape[T, T],
private[stream] override val module: StreamLayout.Module)
extends Graph[UniformFanOutShape[T, T], Unit] {
class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
val in: Inlet[T] = Inlet[T]("Balance.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Balance.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def withAttributes(attr: Attributes): Balance[T] =
new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).nest())
override def createLogic: GraphStageLogic = new GraphStageLogic {
private val pendingQueue = Array.ofDim[Outlet[T]](outputPorts)
private var pendingHead: Int = 0
private var pendingTail: Int = 0
override def named(name: String): Balance[T] = withAttributes(Attributes.name(name))
private var needDownstreamPulls: Int = if (waitForAllDownstreams) outputPorts else 0
private var downstreamsRunning: Int = outputPorts
private def noPending: Boolean = pendingHead == pendingTail
private def enqueue(out: Outlet[T]): Unit = {
pendingQueue(pendingTail % outputPorts) = out
pendingTail += 1
}
private def dequeueAndDispatch(): Unit = {
val out = pendingQueue(pendingHead % outputPorts)
pendingHead += 1
push(out, grab(in))
if (!noPending) pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = dequeueAndDispatch()
})
out.foreach { o
setHandler(o, new OutHandler {
private var hasPulled = false
override def onPull(): Unit = {
if (!hasPulled) {
hasPulled = true
if (needDownstreamPulls > 0) needDownstreamPulls -= 1
}
if (needDownstreamPulls == 0) {
if (isAvailable(in)) {
if (noPending) {
push(o, grab(in))
}
} else {
if (!hasBeenPulled(in)) pull(in)
enqueue(o)
}
} else enqueue(o)
}
override def onDownstreamFinish() = {
downstreamsRunning -= 1
if (downstreamsRunning == 0) completeStage()
else if (!hasPulled && needDownstreamPulls > 0) {
needDownstreamPulls -= 1
if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in)
}
}
})
}
}
override def toString = "Balance"
}
object Zip {
/**
* Create a new `Zip`.
*/
def apply[A, B](): Zip[A, B] = {
val shape = new FanInShape2[A, B, (A, B)]("Zip")
new Zip(shape, new ZipWith2Module[A, B, (A, B)](shape, Keep.both, Attributes.name("Zip")))
}
def apply[A, B](): Zip[A, B] = new Zip()
}
/**
@ -203,14 +431,8 @@ object Zip {
*
* '''Cancels when''' downstream cancels
*/
class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)],
private[stream] override val module: StreamLayout.Module)
extends Graph[FanInShape2[A, B, (A, B)], Unit] {
override def withAttributes(attr: Attributes): Zip[A, B] =
new Zip(shape, module.withAttributes(attr).nest())
override def named(name: String): Zip[A, B] = withAttributes(Attributes.name(name))
class Zip[A, B] extends ZipWith2[A, B, (A, B)](Pair.apply) {
override def toString = "Zip"
}
/**
@ -247,25 +469,15 @@ object Unzip {
* Create a new `Unzip`.
*/
def apply[A, B](): Unzip[A, B] = {
val shape = new FanOutShape2[(A, B), A, B]("Unzip")
new Unzip(shape, new UnzipWith2Module[(A, B), A, B](
shape,
_identity.asInstanceOf[((A, B)) (A, B)],
Attributes.name("Unzip")))
new Unzip()
}
}
/**
* Combine the elements of multiple streams into a stream of the combined elements.
*/
class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B],
private[stream] override val module: StreamLayout.Module)
extends Graph[FanOutShape2[(A, B), A, B], Unit] {
override def withAttributes(attr: Attributes): Unzip[A, B] =
new Unzip(shape, module.withAttributes(attr).nest())
override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name))
class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](identity) {
override def toString = "Unzip"
}
/**
@ -285,16 +497,15 @@ object Concat {
/**
* Create a new `Concat`.
*/
def apply[T](): Concat[T] = {
val shape = new UniformFanInShape[T, T](2)
new Concat(shape, new ConcatModule(shape, Attributes.name("Concat")))
def apply[T](inputCount: Int = 2): Concat[T] = {
new Concat(inputCount)
}
}
/**
* Takes two streams and outputs one stream formed from the two input streams
* Takes multiple streams and outputs one stream formed from the input streams
* by first emitting all of the elements from the first stream and then emitting
* all of the elements from the second stream.
* all of the elements from the second stream, etc.
*
* A `Concat` has one `first` port, one `second` port and one `out` port.
*
@ -306,14 +517,44 @@ object Concat {
*
* '''Cancels when''' downstream cancels
*/
class Concat[T] private (override val shape: UniformFanInShape[T, T],
private[stream] override val module: StreamLayout.Module)
extends Graph[UniformFanInShape[T, T], Unit] {
class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] {
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputCount)(i Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def withAttributes(attr: Attributes): Concat[T] =
new Concat(shape, module.withAttributes(attr).nest())
override def createLogic = new GraphStageLogic {
var activeStream: Int = 0
override def named(name: String): Concat[T] = withAttributes(Attributes.name(name))
{
var idxx = 0
val itr = in.iterator
while (itr.hasNext) {
val i = itr.next()
val idx = idxx
setHandler(i, new InHandler {
override def onPush() = {
push(out, grab(i))
}
override def onUpstreamFinish() = {
if (idx == activeStream) {
activeStream += 1
// Skip closed inputs
while (activeStream < inputCount && isClosed(in(activeStream))) activeStream += 1
if (activeStream == inputCount) completeStage()
else if (isAvailable(out)) pull(in(activeStream))
}
}
})
idxx += 1
}
}
setHandler(out, new OutHandler {
override def onPull() = pull(in(activeStream))
})
}
}
object FlowGraph extends GraphApply {