More consistency between UniformFanInShape and UniformFanOutShape (#23321)

* add+use UniformFanOutShape#outSeq for consistency

UniformFanInShape has inSeq, whereas UniformFanOutShape has outArray.
There's probably no good reason for this discrepancy.

* 2.5.3 is gold, deprecation would start in 2.5.4

* specialize in/outlets where possible

* review comments, binary compatibility

* optimize imports

* use Array copy internally

* give FanInShape1N the deprecation treatment

also, s/T\d/I\d/g

* delete ignored file

* process additional review comments

* make inArray fully private everywhere

* add benchmark heavy on the use of FanInShape.in()

* benchmark says: do not use Array for most n

* add JavaDoc to un-final'ed defs to not override

* change deprecated val to def; do not use it

* process patriknw's review comments
This commit is contained in:
László van den Hoek 2017-09-12 16:14:20 +02:00 committed by Patrik Nordwall
parent e57cab6c85
commit 63d9ec1c87
7 changed files with 98 additions and 43 deletions

View file

@ -23,8 +23,12 @@ class GraphBuilderBenchmark {
MaterializationBenchmark.flowWithMapBuilder(complexity)
@Benchmark
def graph_with_junctions(): RunnableGraph[NotUsed] =
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
def graph_with_junctions_gradual(): RunnableGraph[NotUsed] =
MaterializationBenchmark.graphWithJunctionsGradualBuilder(complexity)
@Benchmark
def graph_with_junctions_immediate(): RunnableGraph[NotUsed] =
MaterializationBenchmark.graphWithJunctionsImmediateBuilder(complexity)
@Benchmark
def graph_with_imported_flow(): RunnableGraph[NotUsed] =

View file

@ -8,7 +8,6 @@ import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ConstantFun
import org.openjdk.jmh.annotations._
import scala.concurrent.Await
import scala.concurrent.duration._
@ -25,7 +24,7 @@ object MaterializationBenchmark {
source.to(Sink.ignore)
}
val graphWithJunctionsBuilder = (numOfJunctions: Int) =>
val graphWithJunctionsGradualBuilder = (numOfJunctions: Int) =>
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
@ -43,6 +42,21 @@ object MaterializationBenchmark {
ClosedShape
})
val graphWithJunctionsImmediateBuilder = (numOfJunctions: Int) =>
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Unit](numOfJunctions))
val merge = b.add(Merge[Unit](numOfJunctions))
for (i <- 0 until numOfJunctions) {
broadcast ~> merge
}
Source.single(()) ~> broadcast
merge ~> Sink.ignore
ClosedShape
})
val graphWithImportedFlowBuilder = (numOfFlows: Int) =>
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b source
import GraphDSL.Implicits._
@ -80,13 +94,15 @@ object MaterializationBenchmark {
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class MaterializationBenchmark {
import MaterializationBenchmark._
implicit val system = ActorSystem("MaterializationBenchmark")
implicit val materializer = ActorMaterializer()
var flowWithMap: RunnableGraph[NotUsed] = _
var graphWithJunctions: RunnableGraph[NotUsed] = _
var graphWithJunctionsGradual: RunnableGraph[NotUsed] = _
var graphWithJunctionsImmediate: RunnableGraph[NotUsed] = _
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
var subStream: RunnableGraph[Future[Unit]] = _
@ -96,7 +112,8 @@ class MaterializationBenchmark {
@Setup
def setup(): Unit = {
flowWithMap = flowWithMapBuilder(complexity)
graphWithJunctions = graphWithJunctionsBuilder(complexity)
graphWithJunctionsGradual = graphWithJunctionsGradualBuilder(complexity)
graphWithJunctionsImmediate = graphWithJunctionsImmediateBuilder(complexity)
graphWithImportedFlow = graphWithImportedFlowBuilder(complexity)
subStream = subStreamBuilder(complexity)
}
@ -110,7 +127,10 @@ class MaterializationBenchmark {
def flow_with_map(): NotUsed = flowWithMap.run()
@Benchmark
def graph_with_junctions(): NotUsed = graphWithJunctions.run()
def graph_with_junctions_gradual(): NotUsed = graphWithJunctionsGradual.run()
@Benchmark
def graph_with_junctions_immediate(): NotUsed = graphWithJunctionsImmediate.run()
@Benchmark
def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run()

View file

@ -3,12 +3,11 @@
*/
package akka.stream.scaladsl
import akka.stream.{ FlowShape, ActorMaterializer, ActorMaterializerSettings, OverflowStrategy }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowShape, OverflowStrategy }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl._
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time._
import scala.collection.immutable

View file

@ -4,7 +4,6 @@
package akka.stream
import scala.collection.immutable
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
object FanInShape {
@ -28,9 +27,15 @@ abstract class FanInShape[+O] private (_out: Outlet[O @uncheckedVariance], _regi
def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name)
final def out: Outlet[O @uncheckedVariance] = _out
final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil
final override def inlets: immutable.Seq[Inlet[_]] = _inlets
final override def outlets: immutable.Seq[Outlet[O @uncheckedVariance]] = _out :: Nil
/**
* Not meant for overriding outside of Akka.
*/
override def inlets: immutable.Seq[Inlet[_]] = _inlets
/**
* Performance of subclass `UniformFanInShape` relies on `_inlets` being a `Vector`, not a `List`.
*/
private var _inlets: Vector[Inlet[_]] = Vector.empty
protected def newInlet[T](name: String): Inlet[T] = {
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else Inlet[T](s"${_name}.$name")
@ -54,28 +59,42 @@ object UniformFanInShape {
}
class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
//ports get added to `FanInShape.inlets` as a side-effect of calling `newInlet`
for (i <- 0 until n) newInlet[T](s"in$i")
def this(n: Int) = this(n, FanInShape.Name[O]("UniformFanIn"))
def this(n: Int, name: String) = this(n, FanInShape.Name[O](name))
def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList))
override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new UniformFanInShape(n, init)
override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]]
val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T](s"in$i"))
def in(n: Int): Inlet[T @uncheckedVariance] = inSeq(n)
final override def inlets: immutable.Seq[Inlet[T @uncheckedVariance]] = super.inlets.asInstanceOf[immutable.Seq[Inlet[T]]]
@deprecated("Use `inlets` or `in(id)` instead.", "2.5.5")
lazy val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = inlets.toIndexedSeq
def in(n: Int): Inlet[T @uncheckedVariance] = inlets(n)
}
class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
//ports get added to `FanInShape.inlets` as a side-effect of calling `newInlet`
val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")
for (i <- 1 until n) newInlet[T1](s"in$i")
def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N"))
def this(n: Int, name: String) = this(n, FanInShape.Name[O](name))
def this(outlet: Outlet[O @uncheckedVariance], in0: Inlet[T0 @uncheckedVariance], inlets1: Array[Inlet[T1 @uncheckedVariance]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList))
override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1N(n, init)
override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]]
val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")
val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}"))
@deprecated("Use `inlets` or `in(id)` instead.", "2.5.5")
lazy val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = inlets
.tail //head is in0
.toIndexedSeq.asInstanceOf[immutable.IndexedSeq[Inlet[T1]]]
def in(n: Int): Inlet[T1 @uncheckedVariance] = {
require(n > 0, "n must be > 0")
in1Seq(n - 1)
inlets(n).asInstanceOf[Inlet[T1]]
}
}

View file

@ -27,9 +27,16 @@ abstract class FanOutShape[-I] private (_in: Inlet[I @uncheckedVariance], _regis
def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name)
final def in: Inlet[I @uncheckedVariance] = _in
final override def outlets: immutable.Seq[Outlet[_]] = _outlets
final override def inlets: immutable.Seq[Inlet[_]] = in :: Nil
/**
* Not meant for overriding outside of Akka.
*/
override def outlets: immutable.Seq[Outlet[_]] = _outlets
final override def inlets: immutable.Seq[Inlet[I @uncheckedVariance]] = in :: Nil
/**
* Performance of subclass `UniformFanOutShape` relies on `_outlets` being a `Vector`, not a `List`.
*/
private var _outlets: Vector[Outlet[_]] = Vector.empty
protected def newOutlet[T](name: String): Outlet[T] = {
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Outlet[T]] else Outlet[T](s"${_name}.$name")
@ -53,14 +60,22 @@ object UniformFanOutShape {
}
class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) {
//initialize by side-effect
for (i <- 0 until n) newOutlet[O](s"out$i")
def this(n: Int) = this(n, FanOutShape.Name[I]("UniformFanOut"))
def this(n: Int, name: String) = this(n, FanOutShape.Name[I](name))
def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.size, FanOutShape.Ports(inlet, outlets.toList))
def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.length, FanOutShape.Ports(inlet, outlets.toList))
override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new UniformFanOutShape(n, init)
override def deepCopy(): UniformFanOutShape[I, O] = super.deepCopy().asInstanceOf[UniformFanOutShape[I, O]]
val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(n)(i => newOutlet[O](s"out$i"))
def out(n: Int): Outlet[O @uncheckedVariance] = outArray(n)
final override def outlets: immutable.Seq[Outlet[O @uncheckedVariance]] = super.outlets.asInstanceOf[immutable.Seq[Outlet[O]]]
@Deprecated
@deprecated("use `outlets` or `out(id)` instead", "2.5.5")
lazy val outArray: Array[Outlet[O @uncheckedVariance]] = outlets.toArray
def out(n: Int): Outlet[O @uncheckedVariance] = outlets(n)
}
[2..#class FanOutShape1[-I, [#+O0#]](_init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) {

View file

@ -455,7 +455,7 @@ import scala.concurrent.{ Future, Promise }
GraphDSL.create() { implicit builder
import GraphDSL.Implicits._
val concat = builder.add(stage)
val ds = concat.inSeq.map { inlet
val ds = concat.inlets.map { inlet
val detacher = builder.add(GraphStages.detacher[T])
detacher ~> inlet
detacher.in

View file

@ -181,7 +181,7 @@ object MergePreferred {
* Merge several streams, taking elements as they arrive from input streams
* (picking from preferred when several have elements ready).
*
* A `MergePreferred` has one `out` port, one `preferred` input port and 0 or more secondary `in` ports.
* A `MergePreferred` has one `out` port, one `preferred` input port and 1 or more secondary `in` ports.
*
* '''Emits when''' one of the inputs has an element available, preferring
* a specified input if multiple have elements available
@ -191,11 +191,9 @@ object MergePreferred {
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
require(secondaryPorts >= 1, "A MergePreferred must have 1 or more secondary input ports")
override def initialAttributes = DefaultAttributes.mergePreferred
override val shape: MergePreferred.MergePreferredShape[T] =
@ -213,8 +211,8 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea
}
override def preStart(): Unit = {
tryPull(preferred)
shape.inSeq.foreach(tryPull)
//while initializing this `MergePreferredShape`, the `preferred` port gets added to `inlets` by side-effect.
shape.inlets.foreach(tryPull)
}
setHandler(out, eagerTerminateOutput)
@ -304,8 +302,6 @@ object MergePrioritized {
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
private val inputPorts = priorities.size
@ -969,8 +965,10 @@ object ZipWithN {
class ZipWithN[A, O](zipper: immutable.Seq[A] O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] {
override def initialAttributes = DefaultAttributes.zipWithN
override val shape = new UniformFanInShape[A, O](n)
def out = shape.out
val inSeq = shape.inSeq
def out: Outlet[O] = shape.out
@deprecated("use `shape.inlets` or `shape.in(id)` instead", "2.5.5")
def inSeq: immutable.IndexedSeq[Inlet[A]] = shape.inlets.asInstanceOf[immutable.IndexedSeq[Inlet[A]]]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
var pending = 0
@ -981,16 +979,16 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[
val pullInlet = pull[A] _
private def pushAll(): Unit = {
push(out, zipper(inSeq.map(grabInlet)))
push(out, zipper(shape.inlets.map(grabInlet)))
if (willShutDown) completeStage()
else inSeq.foreach(pullInlet)
else shape.inlets.foreach(pullInlet)
}
override def preStart(): Unit = {
inSeq.foreach(pullInlet)
shape.inlets.foreach(pullInlet)
}
inSeq.foreach(in {
shape.inlets.foreach(in {
setHandler(in, new InHandler {
override def onPush(): Unit = {
pending -= 1
@ -1288,7 +1286,7 @@ object GraphDSL extends GraphApply {
@tailrec
private[stream] def findOut[I, O](b: Builder[_], junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = {
if (n == junction.outArray.length)
if (n == junction.outlets.length)
throw new IllegalArgumentException(s"no more outlets free on $junction")
else if (!b.traversalBuilder.isUnwired(junction.out(n))) findOut(b, junction, n + 1)
else junction.out(n)
@ -1296,7 +1294,7 @@ object GraphDSL extends GraphApply {
@tailrec
private[stream] def findIn[I, O](b: Builder[_], junction: UniformFanInShape[I, O], n: Int): Inlet[I] = {
if (n == junction.inSeq.length)
if (n == junction.inlets.length)
throw new IllegalArgumentException(s"no more inlets free on $junction")
else if (!b.traversalBuilder.isUnwired(junction.in(n))) findIn(b, junction, n + 1)
else junction.in(n)
@ -1316,7 +1314,7 @@ object GraphDSL extends GraphApply {
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {
def bind(n: Int): Unit = {
if (n == junction.inSeq.length)
if (n == junction.inlets.length)
throw new IllegalArgumentException(s"no more inlets free on $junction")
else if (!b.traversalBuilder.isUnwired(junction.in(n))) bind(n + 1)
else b.addEdge(importAndGetPort(b), junction.in(n))
@ -1359,7 +1357,7 @@ object GraphDSL extends GraphApply {
def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = {
def bind(n: Int): Unit = {
if (n == junction.outArray.length)
if (n == junction.outlets.length)
throw new IllegalArgumentException(s"no more outlets free on $junction")
else if (!b.traversalBuilder.isUnwired(junction.out(n))) bind(n + 1)
else b.addEdge(junction.out(n), importAndGetPortReverse(b))