From 7cbc916b4c7a30b78c149c812385506ccede1400 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 7 Dec 2015 14:34:22 +0100 Subject: [PATCH] !str - 19028 - Adds requirements for minimum ports Broadcast == min output ports: 2 Merge == min input ports: 2 MergePreferred == min secondary ports: 1 Concat == min input ports: 2 --- .../scaladsl/FlowGraphCompileSpec.scala | 12 ++++---- .../stream/scaladsl/FlowSectionSpec.scala | 30 ++++--------------- .../stream/scaladsl/ReverseArrowSpec.scala | 19 ++++++++---- .../scaladsl/ZipWithApply.scala.template | 3 +- .../scala/akka/stream/scaladsl/Graph.scala | 27 +++++++++-------- 5 files changed, 40 insertions(+), 51 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 2065576573..4050a8ca0f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -246,19 +246,21 @@ class FlowGraphCompileSpec extends AkkaSpec { fruitMerge ~> Sink.head[Fruit] "fruitMerge ~> Sink.head[Apple]" shouldNot compile - val appleMerge = b.add(Merge[Apple](1)) + val appleMerge = b.add(Merge[Apple](2)) "Source[Fruit](apples) ~> appleMerge" shouldNot compile + Source.empty[Apple] ~> appleMerge Source[Apple](apples) ~> appleMerge appleMerge ~> Sink.head[Fruit] - val appleMerge2 = b.add(Merge[Apple](1)) + val appleMerge2 = b.add(Merge[Apple](2)) + Source.empty[Apple] ~> appleMerge2 Source[Apple](apples) ~> appleMerge2 appleMerge2 ~> Sink.head[Apple] - val fruitBcast = b.add(Broadcast[Fruit](1)) - Source[Fruit](apples) ~> fruitBcast - //Source[Apple](apples) ~> fruitBcast // FIXME: should compile #16997 + val fruitBcast = b.add(Broadcast[Fruit](2)) + Source[Apple](apples) ~> fruitBcast fruitBcast ~> Sink.head[Fruit] + fruitBcast ~> Sink.ignore "fruitBcast ~> Sink.head[Apple]" shouldNot compile val appleBcast = b.add(Broadcast[Apple](2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala index 64191a9040..466be4790f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala @@ -33,15 +33,8 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { } "have a nested flow with a different dispatcher" in { - val flow = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ - import GraphDSL.Implicits._ - val bcast1 = b.add(Broadcast[Int](1)) - val bcast2 = b.add(Broadcast[Int](1)) - bcast1 ~> Flow[Int].map(sendThreadNameTo(testActor)) ~> bcast2.in - FlowShape(bcast1.in, bcast2.out(0)) - }).withAttributes(dispatcher("my-dispatcher1")) - - Source.single(1).via(flow).to(Sink.ignore).run() + Source.single(1).via( + Flow[Int].map(sendThreadNameTo(testActor)).withAttributes(dispatcher("my-dispatcher1"))).to(Sink.ignore).run() expectMsgType[String] should include("my-dispatcher1") } @@ -51,21 +44,9 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { val probe1 = TestProbe() val probe2 = TestProbe() - val flow1 = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ - import GraphDSL.Implicits._ - val bcast1 = b.add(Broadcast[Int](1)) - val bcast2 = b.add(Broadcast[Int](1)) - bcast1 ~> Flow[Int].map(sendThreadNameTo(probe1.ref)) ~> bcast2.in - FlowShape(bcast1.in, bcast2.out(0)) - }).withAttributes(dispatcher("my-dispatcher1")) + val flow1 = Flow[Int].map(sendThreadNameTo(probe1.ref)).withAttributes(dispatcher("my-dispatcher1")) - val flow2 = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ - import GraphDSL.Implicits._ - val bcast1 = b.add(Broadcast[Int](1)) - val bcast2 = b.add(Broadcast[Int](1)) - bcast1 ~> flow1.via(Flow[Int].map(sendThreadNameTo(probe2.ref))) ~> bcast2.in - FlowShape(bcast1.in, bcast2.out(0)) - }).withAttributes(dispatcher("my-dispatcher2")) + val flow2 = flow1.via(Flow[Int].map(sendThreadNameTo(probe2.ref))).withAttributes(dispatcher("my-dispatcher2")) Source.single(1).via(flow2).to(Sink.ignore).run() @@ -75,8 +56,7 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { } "include name in toString" in { - //FIXME: Flow has no simple toString anymore - pending + pending //FIXME: Flow has no simple toString anymore val n = "Uppercase reverser" val f1 = Flow[String].map(_.toLowerCase) val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 1733f924ae..70c9c62add 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -77,8 +77,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work from UniformFanInShape" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](1)) + val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](2)) f <~ source + f <~ Source.empty f ~> s ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) @@ -87,8 +88,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work from UniformFanOutShape" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](1)) + val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](2)) f <~ source + f ~> Sink.ignore f ~> s ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) @@ -133,8 +135,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work towards UniformFanInShape" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](1)) + val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](2)) s <~ f + Source.empty ~> f source ~> f ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) @@ -143,8 +146,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "fail towards already full UniformFanInShape" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](1)) + val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](2)) val src = b.add(source) + Source.empty ~> f src ~> f (the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include("no more inlets free") ClosedShape @@ -154,8 +158,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work towards UniformFanOutShape" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](1)) + val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](2)) s <~ f + Sink.ignore <~ f source ~> f ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) @@ -164,9 +169,11 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "fail towards already full UniformFanOutShape" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](1)) + val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](2)) + val sink2: SinkShape[Int] = b.add(Sink.ignore) val src = b.add(source) src ~> f + sink2 <~ f (the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include("already connected") ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 2322b9d0cc..8432b45503 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -14,9 +14,8 @@ trait ZipWithApply { * * @param zipper zipping-function from the input values to the output value */ - def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] = { + def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] = new ZipWith1(zipper) - } # ] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e745392964..5845290fd7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -36,6 +36,7 @@ object Merge { * '''Cancels when''' downstream cancels */ class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { + require(inputPorts > 1, "A Merge must have more than 1 input port") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i)) val out: Outlet[T] = Outlet[T]("Merge.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) @@ -139,6 +140,7 @@ object MergePreferred { * A `Broadcast` has one `in` port and 2 or more `out` ports. */ class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { + require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports") override val shape: MergePreferred.MergePreferredShape[T] = new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred") @@ -232,9 +234,8 @@ object Broadcast { * @param outputPorts number of output ports * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ - def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = { + def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = new Broadcast(outputPorts, eagerCancel) - } } /** @@ -252,6 +253,7 @@ object Broadcast { * */ class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { + require(outputPorts > 1, "A Broadcast must have more than 1 output ports") val in: Inlet[T] = Inlet[T]("Broadast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i)) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) @@ -329,9 +331,8 @@ object Balance { * elements to downstream outputs until all of them have requested at least one element, * default value is `false` */ - def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = { + def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = new Balance(outputPorts, waitForAllDownstreams) - } } /** @@ -350,6 +351,7 @@ object Balance { * '''Cancels when''' all downstreams cancel */ class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { + require(outputPorts > 1, "A Balance must have more than 1 output ports") val in: Inlet[T] = Inlet[T]("Balance.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) @@ -469,15 +471,13 @@ object Unzip { /** * Create a new `Unzip`. */ - def apply[A, B](): Unzip[A, B] = { - new Unzip() - } + def apply[A, B](): Unzip[A, B] = new Unzip() } /** * Combine the elements of multiple streams into a stream of the combined elements. */ -class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](identity) { +class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](ConstantFun.scalaIdentityFunction) { override def toString = "Unzip" } @@ -498,7 +498,7 @@ object Concat { /** * Create a new `Concat`. */ - def apply[T](inputCount: Int = 2): Concat[T] = new Concat(inputCount) + def apply[T](inputPorts: Int = 2): Concat[T] = new Concat(inputPorts) } /** @@ -516,8 +516,9 @@ object Concat { * * '''Cancels when''' downstream cancels */ -class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] { - val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputCount)(i ⇒ Inlet[T]("Concat.in" + i)) +class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] { + require(inputPorts > 1, "A Concat must have more than 1 input ports") + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Concat.in" + i)) val out: Outlet[T] = Outlet[T]("Concat.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) @@ -539,8 +540,8 @@ class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] { if (idx == activeStream) { activeStream += 1 // Skip closed inputs - while (activeStream < inputCount && isClosed(in(activeStream))) activeStream += 1 - if (activeStream == inputCount) completeStage() + while (activeStream < inputPorts && isClosed(in(activeStream))) activeStream += 1 + if (activeStream == inputPorts) completeStage() else if (isAvailable(out)) pull(in(activeStream)) } }