=str #19549 #19257 Allow Broadcast(1) and Merge(1)

This commit is contained in:
Johan Andrén 2016-01-21 14:43:26 +01:00
parent b1d99ca5a8
commit 26e2dcb857
3 changed files with 30 additions and 2 deletions

View file

@ -49,6 +49,19 @@ class GraphBroadcastSpec extends AkkaSpec {
c2.expectComplete()
}
"work with one-way broadcast" in assertAllStagesStopped {
val result = Source.fromGraph(GraphDSL.create() { implicit b
val broadcast = b.add(Broadcast[Int](1))
val source = b.add(Source(1 to 3))
source ~> broadcast.in
SourceShape(broadcast.out(0))
}).runFold(Seq[Int]())(_ :+ _)
Await.result(result, 3.seconds) should ===(Seq(1, 2, 3))
}
"work with n-way broadcast" in assertAllStagesStopped {
val headSink = Sink.head[Seq[Int]]

View file

@ -5,6 +5,7 @@ package akka.stream.scaladsl
import akka.stream._
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.testkit._
@ -58,6 +59,18 @@ class GraphMergeSpec extends TwoStreamsSetup {
probe.expectComplete()
}
"work with 1-way merge" in {
val result = Source.fromGraph(GraphDSL.create() { implicit b
val merge = b.add(Merge[Int](1))
val source = b.add(Source(1 to 3))
source ~> merge.in(0)
SourceShape(merge.out)
}).runFold(Seq[Int]())(_ :+ _)
Await.result(result, 3.seconds) should ===(Seq(1, 2, 3))
}
"work with n-way merge" in {
val source1 = Source(List(1))
val source2 = Source(List(2))

View file

@ -39,7 +39,8 @@ object Merge {
* '''Cancels when''' downstream cancels
*/
final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Merge must have more than 1 input port")
// one input might seem counter intuitive but saves us from special handling in other places
require(inputPorts >= 1, "A Merge must have one or more input ports")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
@ -395,7 +396,8 @@ object Broadcast {
*
*/
final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
require(outputPorts > 1, "A Broadcast must have more than 1 output ports")
// one input might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Broadcast must have one or more output ports")
val in: Inlet[T] = Inlet[T]("Broadast.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Broadcast.out" + i))
override def initialAttributes = DefaultAttributes.broadcast