Merge pull request #19509 from akka/wip-misc-streams-thingies-√

Minor streams touchups
This commit is contained in:
Roland Kuhn 2016-01-19 17:01:52 +01:00
commit ec18320d5a
9 changed files with 74 additions and 70 deletions

View file

@ -7,7 +7,7 @@ import akka.stream._
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.Stages.{ StageModule, SymbolicStage }
import akka.stream.impl.Stages.{DefaultAttributes, StageModule, SymbolicStage}
import akka.stream.impl.StreamLayout._
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
import scala.annotation.unchecked.uncheckedVariance
@ -42,7 +42,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
override def initialAttributes = Attributes.name("Merge")
override def initialAttributes = DefaultAttributes.merge
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
@ -145,7 +145,7 @@ object MergePreferred {
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
override def initialAttributes = Attributes.name("MergePreferred")
override def initialAttributes = DefaultAttributes.mergePreferred
override val shape: MergePreferred.MergePreferredShape[T] =
new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred")
@ -397,7 +397,7 @@ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) ext
require(outputPorts > 1, "A Broadcast must have more than 1 output ports")
val in: Inlet[T] = Inlet[T]("Broadast.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Broadcast.out" + i))
override def initialAttributes = Attributes.name("Broadcast")
override def initialAttributes = DefaultAttributes.broadcast
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
@ -599,7 +599,7 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext
require(outputPorts > 1, "A Balance must have more than 1 output ports")
val in: Inlet[T] = Inlet[T]("Balance.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Balance.out" + i))
override def initialAttributes = Attributes.name("Balance")
override def initialAttributes = DefaultAttributes.balance
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
@ -767,7 +767,7 @@ final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T
require(inputPorts > 1, "A Concat must have more than 1 input ports")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")
override def initialAttributes = Attributes.name("Concat")
override def initialAttributes = DefaultAttributes.concat
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {