From 63d9ec1c87cd33a0c50768a232a0187b7a2b05f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20van=20den=20Hoek?= Date: Tue, 12 Sep 2017 16:14:20 +0200 Subject: [PATCH] More consistency between UniformFanInShape and UniformFanOutShape (#23321) * add+use UniformFanOutShape#outSeq for consistency UniformFanInShape has inSeq, whereas UniformFanOutShape has outArray. There's probably no good reason for this discrepancy. * 2.5.3 is gold, deprecation would start in 2.5.4 * specialize in/outlets where possible * review comments, binary compatibility * optimize imports * use Array copy internally * give FanInShape1N the deprecation treatment also, s/T\d/I\d/g * delete ignored file * process additional review comments * make inArray fully private everywhere * add benchmark heavy on the use of FanInShape.in() * benchmark says: do not use Array for most n * add JavaDoc to un-final'ed defs to not override * change deprecated val to def; do not use it * process patriknw's review comments --- .../akka/stream/GraphBuilderBenchmark.scala | 8 +++-- .../stream/MaterializationBenchmark.scala | 30 +++++++++++++--- .../akka/stream/scaladsl/FlowJoinSpec.scala | 3 +- .../akka/stream/FanInShape.scala.template | 35 ++++++++++++++----- .../akka/stream/FanOutShape.scala.template | 29 +++++++++++---- .../akka/stream/impl/fusing/GraphStages.scala | 2 +- .../scala/akka/stream/scaladsl/Graph.scala | 34 +++++++++--------- 7 files changed, 98 insertions(+), 43 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala index 3c2da2883d..cb7ff5e492 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala @@ -23,8 +23,12 @@ class GraphBuilderBenchmark { MaterializationBenchmark.flowWithMapBuilder(complexity) @Benchmark - def graph_with_junctions(): RunnableGraph[NotUsed] = - MaterializationBenchmark.graphWithJunctionsBuilder(complexity) + def graph_with_junctions_gradual(): RunnableGraph[NotUsed] = + MaterializationBenchmark.graphWithJunctionsGradualBuilder(complexity) + + @Benchmark + def graph_with_junctions_immediate(): RunnableGraph[NotUsed] = + MaterializationBenchmark.graphWithJunctionsImmediateBuilder(complexity) @Benchmark def graph_with_imported_flow(): RunnableGraph[NotUsed] = diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 847547c0a1..b29c1e2bbd 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -8,7 +8,6 @@ import java.util.concurrent.TimeUnit import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl._ -import akka.util.ConstantFun import org.openjdk.jmh.annotations._ import scala.concurrent.Await import scala.concurrent.duration._ @@ -25,7 +24,7 @@ object MaterializationBenchmark { source.to(Sink.ignore) } - val graphWithJunctionsBuilder = (numOfJunctions: Int) => + val graphWithJunctionsGradualBuilder = (numOfJunctions: Int) => RunnableGraph.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ @@ -43,6 +42,21 @@ object MaterializationBenchmark { ClosedShape }) + val graphWithJunctionsImmediateBuilder = (numOfJunctions: Int) => + RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val broadcast = b.add(Broadcast[Unit](numOfJunctions)) + val merge = b.add(Merge[Unit](numOfJunctions)) + for (i <- 0 until numOfJunctions) { + broadcast ~> merge + } + + Source.single(()) ~> broadcast + merge ~> Sink.ignore + ClosedShape + }) + val graphWithImportedFlowBuilder = (numOfFlows: Int) => RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒ import GraphDSL.Implicits._ @@ -80,13 +94,15 @@ object MaterializationBenchmark { @OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode(Array(Mode.Throughput)) class MaterializationBenchmark { + import MaterializationBenchmark._ implicit val system = ActorSystem("MaterializationBenchmark") implicit val materializer = ActorMaterializer() var flowWithMap: RunnableGraph[NotUsed] = _ - var graphWithJunctions: RunnableGraph[NotUsed] = _ + var graphWithJunctionsGradual: RunnableGraph[NotUsed] = _ + var graphWithJunctionsImmediate: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ var subStream: RunnableGraph[Future[Unit]] = _ @@ -96,7 +112,8 @@ class MaterializationBenchmark { @Setup def setup(): Unit = { flowWithMap = flowWithMapBuilder(complexity) - graphWithJunctions = graphWithJunctionsBuilder(complexity) + graphWithJunctionsGradual = graphWithJunctionsGradualBuilder(complexity) + graphWithJunctionsImmediate = graphWithJunctionsImmediateBuilder(complexity) graphWithImportedFlow = graphWithImportedFlowBuilder(complexity) subStream = subStreamBuilder(complexity) } @@ -110,7 +127,10 @@ class MaterializationBenchmark { def flow_with_map(): NotUsed = flowWithMap.run() @Benchmark - def graph_with_junctions(): NotUsed = graphWithJunctions.run() + def graph_with_junctions_gradual(): NotUsed = graphWithJunctionsGradual.run() + + @Benchmark + def graph_with_junctions_immediate(): NotUsed = graphWithJunctionsImmediate.run() @Benchmark def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala index d2dd948a63..c6d33e69d3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala @@ -3,12 +3,11 @@ */ package akka.stream.scaladsl -import akka.stream.{ FlowShape, ActorMaterializer, ActorMaterializerSettings, OverflowStrategy } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowShape, OverflowStrategy } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl._ import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures import org.scalatest.time._ import scala.collection.immutable diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template index 9a6ba96b19..a7c6985977 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template @@ -4,7 +4,6 @@ package akka.stream import scala.collection.immutable -import scala.annotation.varargs import scala.annotation.unchecked.uncheckedVariance object FanInShape { @@ -28,9 +27,15 @@ abstract class FanInShape[+O] private (_out: Outlet[O @uncheckedVariance], _regi def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name) final def out: Outlet[O @uncheckedVariance] = _out - final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil - final override def inlets: immutable.Seq[Inlet[_]] = _inlets + final override def outlets: immutable.Seq[Outlet[O @uncheckedVariance]] = _out :: Nil + /** + * Not meant for overriding outside of Akka. + */ + override def inlets: immutable.Seq[Inlet[_]] = _inlets + /** + * Performance of subclass `UniformFanInShape` relies on `_inlets` being a `Vector`, not a `List`. + */ private var _inlets: Vector[Inlet[_]] = Vector.empty protected def newInlet[T](name: String): Inlet[T] = { val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else Inlet[T](s"${_name}.$name") @@ -54,28 +59,42 @@ object UniformFanInShape { } class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { + + //ports get added to `FanInShape.inlets` as a side-effect of calling `newInlet` + for (i <- 0 until n) newInlet[T](s"in$i") + def this(n: Int) = this(n, FanInShape.Name[O]("UniformFanIn")) def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList)) override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new UniformFanInShape(n, init) override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]] - val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T](s"in$i")) - def in(n: Int): Inlet[T @uncheckedVariance] = inSeq(n) + final override def inlets: immutable.Seq[Inlet[T @uncheckedVariance]] = super.inlets.asInstanceOf[immutable.Seq[Inlet[T]]] + + @deprecated("Use `inlets` or `in(id)` instead.", "2.5.5") + lazy val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = inlets.toIndexedSeq + def in(n: Int): Inlet[T @uncheckedVariance] = inlets(n) } class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { + + //ports get added to `FanInShape.inlets` as a side-effect of calling `newInlet` + val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0") + for (i <- 1 until n) newInlet[T1](s"in$i") + def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N")) def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) def this(outlet: Outlet[O @uncheckedVariance], in0: Inlet[T0 @uncheckedVariance], inlets1: Array[Inlet[T1 @uncheckedVariance]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList)) override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1N(n, init) override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]] - val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0") - val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}")) + @deprecated("Use `inlets` or `in(id)` instead.", "2.5.5") + lazy val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = inlets + .tail //head is in0 + .toIndexedSeq.asInstanceOf[immutable.IndexedSeq[Inlet[T1]]] def in(n: Int): Inlet[T1 @uncheckedVariance] = { require(n > 0, "n must be > 0") - in1Seq(n - 1) + inlets(n).asInstanceOf[Inlet[T1]] } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template index 7be46474f4..5e63d20ac8 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template @@ -27,9 +27,16 @@ abstract class FanOutShape[-I] private (_in: Inlet[I @uncheckedVariance], _regis def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name) final def in: Inlet[I @uncheckedVariance] = _in - final override def outlets: immutable.Seq[Outlet[_]] = _outlets - final override def inlets: immutable.Seq[Inlet[_]] = in :: Nil - + + /** + * Not meant for overriding outside of Akka. + */ + override def outlets: immutable.Seq[Outlet[_]] = _outlets + final override def inlets: immutable.Seq[Inlet[I @uncheckedVariance]] = in :: Nil + + /** + * Performance of subclass `UniformFanOutShape` relies on `_outlets` being a `Vector`, not a `List`. + */ private var _outlets: Vector[Outlet[_]] = Vector.empty protected def newOutlet[T](name: String): Outlet[T] = { val p = if (_registered.hasNext) _registered.next().asInstanceOf[Outlet[T]] else Outlet[T](s"${_name}.$name") @@ -53,14 +60,22 @@ object UniformFanOutShape { } class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) { + + //initialize by side-effect + for (i <- 0 until n) newOutlet[O](s"out$i") + def this(n: Int) = this(n, FanOutShape.Name[I]("UniformFanOut")) def this(n: Int, name: String) = this(n, FanOutShape.Name[I](name)) - def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) + def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.length, FanOutShape.Ports(inlet, outlets.toList)) override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new UniformFanOutShape(n, init) override def deepCopy(): UniformFanOutShape[I, O] = super.deepCopy().asInstanceOf[UniformFanOutShape[I, O]] - - val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(n)(i => newOutlet[O](s"out$i")) - def out(n: Int): Outlet[O @uncheckedVariance] = outArray(n) + + final override def outlets: immutable.Seq[Outlet[O @uncheckedVariance]] = super.outlets.asInstanceOf[immutable.Seq[Outlet[O]]] + + @Deprecated + @deprecated("use `outlets` or `out(id)` instead", "2.5.5") + lazy val outArray: Array[Outlet[O @uncheckedVariance]] = outlets.toArray + def out(n: Int): Outlet[O @uncheckedVariance] = outlets(n) } [2..#class FanOutShape1[-I, [#+O0#]](_init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 30848abc29..4251b48947 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -455,7 +455,7 @@ import scala.concurrent.{ Future, Promise } GraphDSL.create() { implicit builder ⇒ import GraphDSL.Implicits._ val concat = builder.add(stage) - val ds = concat.inSeq.map { inlet ⇒ + val ds = concat.inlets.map { inlet ⇒ val detacher = builder.add(GraphStages.detacher[T]) detacher ~> inlet detacher.in diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 75fab19fcb..bea6b280f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -181,7 +181,7 @@ object MergePreferred { * Merge several streams, taking elements as they arrive from input streams * (picking from preferred when several have elements ready). * - * A `MergePreferred` has one `out` port, one `preferred` input port and 0 or more secondary `in` ports. + * A `MergePreferred` has one `out` port, one `preferred` input port and 1 or more secondary `in` ports. * * '''Emits when''' one of the inputs has an element available, preferring * a specified input if multiple have elements available @@ -191,11 +191,9 @@ object MergePreferred { * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` * * '''Cancels when''' downstream cancels - * - * A `Broadcast` has one `in` port and 2 or more `out` ports. */ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { - require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports") + require(secondaryPorts >= 1, "A MergePreferred must have 1 or more secondary input ports") override def initialAttributes = DefaultAttributes.mergePreferred override val shape: MergePreferred.MergePreferredShape[T] = @@ -213,8 +211,8 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea } override def preStart(): Unit = { - tryPull(preferred) - shape.inSeq.foreach(tryPull) + //while initializing this `MergePreferredShape`, the `preferred` port gets added to `inlets` by side-effect. + shape.inlets.foreach(tryPull) } setHandler(out, eagerTerminateOutput) @@ -304,8 +302,6 @@ object MergePrioritized { * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` * * '''Cancels when''' downstream cancels - * - * A `Broadcast` has one `in` port and 2 or more `out` ports. */ final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] { private val inputPorts = priorities.size @@ -969,8 +965,10 @@ object ZipWithN { class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] { override def initialAttributes = DefaultAttributes.zipWithN override val shape = new UniformFanInShape[A, O](n) - def out = shape.out - val inSeq = shape.inSeq + def out: Outlet[O] = shape.out + + @deprecated("use `shape.inlets` or `shape.in(id)` instead", "2.5.5") + def inSeq: immutable.IndexedSeq[Inlet[A]] = shape.inlets.asInstanceOf[immutable.IndexedSeq[Inlet[A]]] override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { var pending = 0 @@ -981,16 +979,16 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[ val pullInlet = pull[A] _ private def pushAll(): Unit = { - push(out, zipper(inSeq.map(grabInlet))) + push(out, zipper(shape.inlets.map(grabInlet))) if (willShutDown) completeStage() - else inSeq.foreach(pullInlet) + else shape.inlets.foreach(pullInlet) } override def preStart(): Unit = { - inSeq.foreach(pullInlet) + shape.inlets.foreach(pullInlet) } - inSeq.foreach(in ⇒ { + shape.inlets.foreach(in ⇒ { setHandler(in, new InHandler { override def onPush(): Unit = { pending -= 1 @@ -1288,7 +1286,7 @@ object GraphDSL extends GraphApply { @tailrec private[stream] def findOut[I, O](b: Builder[_], junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = { - if (n == junction.outArray.length) + if (n == junction.outlets.length) throw new IllegalArgumentException(s"no more outlets free on $junction") else if (!b.traversalBuilder.isUnwired(junction.out(n))) findOut(b, junction, n + 1) else junction.out(n) @@ -1296,7 +1294,7 @@ object GraphDSL extends GraphApply { @tailrec private[stream] def findIn[I, O](b: Builder[_], junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { - if (n == junction.inSeq.length) + if (n == junction.inlets.length) throw new IllegalArgumentException(s"no more inlets free on $junction") else if (!b.traversalBuilder.isUnwired(junction.in(n))) findIn(b, junction, n + 1) else junction.in(n) @@ -1316,7 +1314,7 @@ object GraphDSL extends GraphApply { def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = { def bind(n: Int): Unit = { - if (n == junction.inSeq.length) + if (n == junction.inlets.length) throw new IllegalArgumentException(s"no more inlets free on $junction") else if (!b.traversalBuilder.isUnwired(junction.in(n))) bind(n + 1) else b.addEdge(importAndGetPort(b), junction.in(n)) @@ -1359,7 +1357,7 @@ object GraphDSL extends GraphApply { def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = { def bind(n: Int): Unit = { - if (n == junction.outArray.length) + if (n == junction.outlets.length) throw new IllegalArgumentException(s"no more outlets free on $junction") else if (!b.traversalBuilder.isUnwired(junction.out(n))) bind(n + 1) else b.addEdge(junction.out(n), importAndGetPortReverse(b))