diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 4cae284754..a855286c87 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -4,7 +4,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.Future -import akka.stream.{ ClosedShape, ActorMaterializer, ActorMaterializerSettings } +import akka.stream.{ SourceShape, ClosedShape, ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ @@ -110,6 +110,18 @@ class GraphBalanceSpec extends AkkaSpec { s2.expectComplete() } + "work with one-way merge" in { + val result = Source.fromGraph(GraphDSL.create() { implicit b ⇒ + val balance = b.add(Balance[Int](1)) + val source = b.add(Source(1 to 3)) + + source ~> balance.in + SourceShape(balance.out(0)) + }).runFold(Seq[Int]())(_ :+ _) + + Await.result(result, 3.seconds) should ===(Seq(1, 2, 3)) + } + "work with 5-way balance" in { val sink = Sink.head[Seq[Int]] diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index f0d9ee16ba..6b277d9ec4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -59,7 +59,7 @@ class GraphMergeSpec extends TwoStreamsSetup { probe.expectComplete() } - "work with 1-way merge" in { + "work with one-way merge" in { val result = Source.fromGraph(GraphDSL.create() { implicit b ⇒ val merge = b.add(Merge[Int](1)) val source = b.add(Source(1 to 3)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 9fa3e5b684..63ff98c8b6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -396,7 +396,7 @@ object Broadcast { * */ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { - // one input might seem counter intuitive but saves us from special handling in other places + // one output might seem counter intuitive but saves us from special handling in other places require(outputPorts >= 1, "A Broadcast must have one or more output ports") val in: Inlet[T] = Inlet[T]("Broadast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i)) @@ -599,7 +599,8 @@ object Balance { * '''Cancels when''' all downstreams cancel */ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { - require(outputPorts > 1, "A Balance must have more than 1 output ports") + // one output might seem counter intuitive but saves us from special handling in other places + require(outputPorts >= 1, "A Balance must have one or more output ports") val in: Inlet[T] = Inlet[T]("Balance.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) override def initialAttributes = DefaultAttributes.balance