Merge pull request #19116 from akka/wip-19028-require-min-ports-√

!str - 19028 - Adds requirements for minimum ports
This commit is contained in:
Viktor Klang (√) 2015-12-08 14:40:24 +01:00
commit 108688de99
5 changed files with 40 additions and 51 deletions

View file

@ -246,19 +246,21 @@ class FlowGraphCompileSpec extends AkkaSpec {
fruitMerge ~> Sink.head[Fruit]
"fruitMerge ~> Sink.head[Apple]" shouldNot compile
val appleMerge = b.add(Merge[Apple](1))
val appleMerge = b.add(Merge[Apple](2))
"Source[Fruit](apples) ~> appleMerge" shouldNot compile
Source.empty[Apple] ~> appleMerge
Source[Apple](apples) ~> appleMerge
appleMerge ~> Sink.head[Fruit]
val appleMerge2 = b.add(Merge[Apple](1))
val appleMerge2 = b.add(Merge[Apple](2))
Source.empty[Apple] ~> appleMerge2
Source[Apple](apples) ~> appleMerge2
appleMerge2 ~> Sink.head[Apple]
val fruitBcast = b.add(Broadcast[Fruit](1))
Source[Fruit](apples) ~> fruitBcast
//Source[Apple](apples) ~> fruitBcast // FIXME: should compile #16997
val fruitBcast = b.add(Broadcast[Fruit](2))
Source[Apple](apples) ~> fruitBcast
fruitBcast ~> Sink.head[Fruit]
fruitBcast ~> Sink.ignore
"fruitBcast ~> Sink.head[Apple]" shouldNot compile
val appleBcast = b.add(Broadcast[Apple](2))

View file

@ -33,15 +33,8 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
}
"have a nested flow with a different dispatcher" in {
val flow = Flow.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val bcast1 = b.add(Broadcast[Int](1))
val bcast2 = b.add(Broadcast[Int](1))
bcast1 ~> Flow[Int].map(sendThreadNameTo(testActor)) ~> bcast2.in
FlowShape(bcast1.in, bcast2.out(0))
}).withAttributes(dispatcher("my-dispatcher1"))
Source.single(1).via(flow).to(Sink.ignore).run()
Source.single(1).via(
Flow[Int].map(sendThreadNameTo(testActor)).withAttributes(dispatcher("my-dispatcher1"))).to(Sink.ignore).run()
expectMsgType[String] should include("my-dispatcher1")
}
@ -51,21 +44,9 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
val probe1 = TestProbe()
val probe2 = TestProbe()
val flow1 = Flow.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val bcast1 = b.add(Broadcast[Int](1))
val bcast2 = b.add(Broadcast[Int](1))
bcast1 ~> Flow[Int].map(sendThreadNameTo(probe1.ref)) ~> bcast2.in
FlowShape(bcast1.in, bcast2.out(0))
}).withAttributes(dispatcher("my-dispatcher1"))
val flow1 = Flow[Int].map(sendThreadNameTo(probe1.ref)).withAttributes(dispatcher("my-dispatcher1"))
val flow2 = Flow.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val bcast1 = b.add(Broadcast[Int](1))
val bcast2 = b.add(Broadcast[Int](1))
bcast1 ~> flow1.via(Flow[Int].map(sendThreadNameTo(probe2.ref))) ~> bcast2.in
FlowShape(bcast1.in, bcast2.out(0))
}).withAttributes(dispatcher("my-dispatcher2"))
val flow2 = flow1.via(Flow[Int].map(sendThreadNameTo(probe2.ref))).withAttributes(dispatcher("my-dispatcher2"))
Source.single(1).via(flow2).to(Sink.ignore).run()
@ -75,8 +56,7 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
}
"include name in toString" in {
//FIXME: Flow has no simple toString anymore
pending
pending //FIXME: Flow has no simple toString anymore
val n = "Uppercase reverser"
val f1 = Flow[String].map(_.toLowerCase)
val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase)

View file

@ -77,8 +77,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
"work from UniformFanInShape" in {
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b
s
val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](1))
val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](2))
f <~ source
f <~ Source.empty
f ~> s
ClosedShape
}).run(), 1.second) should ===(Seq(1, 2, 3))
@ -87,8 +88,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
"work from UniformFanOutShape" in {
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b
s
val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](1))
val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](2))
f <~ source
f ~> Sink.ignore
f ~> s
ClosedShape
}).run(), 1.second) should ===(Seq(1, 2, 3))
@ -133,8 +135,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
"work towards UniformFanInShape" in {
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b
s
val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](1))
val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](2))
s <~ f
Source.empty ~> f
source ~> f
ClosedShape
}).run(), 1.second) should ===(Seq(1, 2, 3))
@ -143,8 +146,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
"fail towards already full UniformFanInShape" in {
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b
s
val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](1))
val f: UniformFanInShape[Int, Int] = b.add(Merge[Int](2))
val src = b.add(source)
Source.empty ~> f
src ~> f
(the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include("no more inlets free")
ClosedShape
@ -154,8 +158,9 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
"work towards UniformFanOutShape" in {
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b
s
val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](1))
val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](2))
s <~ f
Sink.ignore <~ f
source ~> f
ClosedShape
}).run(), 1.second) should ===(Seq(1, 2, 3))
@ -164,9 +169,11 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
"fail towards already full UniformFanOutShape" in {
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b
s
val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](1))
val f: UniformFanOutShape[Int, Int] = b.add(Broadcast[Int](2))
val sink2: SinkShape[Int] = b.add(Sink.ignore)
val src = b.add(source)
src ~> f
sink2 <~ f
(the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include("already connected")
ClosedShape
}).run(), 1.second) should ===(Seq(1, 2, 3))

View file

@ -14,9 +14,8 @@ trait ZipWithApply {
*
* @param zipper zipping-function from the input values to the output value
*/
def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] = {
def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] =
new ZipWith1(zipper)
}
#
]

View file

@ -36,6 +36,7 @@ object Merge {
* '''Cancels when''' downstream cancels
*/
class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Merge must have more than 1 input port")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
@ -139,6 +140,7 @@ object MergePreferred {
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
override val shape: MergePreferred.MergePreferredShape[T] =
new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred")
@ -232,9 +234,8 @@ object Broadcast {
* @param outputPorts number of output ports
* @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel.
*/
def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = {
def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] =
new Broadcast(outputPorts, eagerCancel)
}
}
/**
@ -252,6 +253,7 @@ object Broadcast {
*
*/
class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
require(outputPorts > 1, "A Broadcast must have more than 1 output ports")
val in: Inlet[T] = Inlet[T]("Broadast.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Broadcast.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
@ -329,9 +331,8 @@ object Balance {
* elements to downstream outputs until all of them have requested at least one element,
* default value is `false`
*/
def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = {
def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] =
new Balance(outputPorts, waitForAllDownstreams)
}
}
/**
@ -350,6 +351,7 @@ object Balance {
* '''Cancels when''' all downstreams cancel
*/
class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
require(outputPorts > 1, "A Balance must have more than 1 output ports")
val in: Inlet[T] = Inlet[T]("Balance.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Balance.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
@ -469,15 +471,13 @@ object Unzip {
/**
* Create a new `Unzip`.
*/
def apply[A, B](): Unzip[A, B] = {
new Unzip()
}
def apply[A, B](): Unzip[A, B] = new Unzip()
}
/**
* Combine the elements of multiple streams into a stream of the combined elements.
*/
class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](identity) {
class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](ConstantFun.scalaIdentityFunction) {
override def toString = "Unzip"
}
@ -498,7 +498,7 @@ object Concat {
/**
* Create a new `Concat`.
*/
def apply[T](inputCount: Int = 2): Concat[T] = new Concat(inputCount)
def apply[T](inputPorts: Int = 2): Concat[T] = new Concat(inputPorts)
}
/**
@ -516,8 +516,9 @@ object Concat {
*
* '''Cancels when''' downstream cancels
*/
class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] {
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputCount)(i Inlet[T]("Concat.in" + i))
class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Concat must have more than 1 input ports")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
@ -539,8 +540,8 @@ class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] {
if (idx == activeStream) {
activeStream += 1
// Skip closed inputs
while (activeStream < inputCount && isClosed(in(activeStream))) activeStream += 1
if (activeStream == inputCount) completeStage()
while (activeStream < inputPorts && isClosed(in(activeStream))) activeStream += 1
if (activeStream == inputPorts) completeStage()
else if (isAvailable(out)) pull(in(activeStream))
}
}