+str #15960 Correction of FlowGraph variance

This commit is contained in:
Patrik Nordwall 2014-09-30 15:40:08 +02:00
parent d89d9abf49
commit bd352df0b2
2 changed files with 56 additions and 4 deletions

View file

@ -6,8 +6,16 @@ package akka.stream.scaladsl2
import akka.stream.testkit.AkkaSpec
import akka.stream.Transformer
import akka.stream.OverflowStrategy
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.testkit.StreamTestKit.PublisherProbe
object FlowGraphCompileSpec {
class Fruit
class Apple extends Fruit
}
class FlowGraphCompileSpec extends AkkaSpec {
import FlowGraphCompileSpec._
implicit val mat = FlowMaterializer()
@ -273,5 +281,49 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.getMessage should include("at most 1 outgoing")
}
"build with variance" in {
val out = SubscriberSink(SubscriberProbe[Fruit]())
FlowGraph { b
val merge = Merge[Fruit]
b.
addEdge(FlowFrom[Fruit](() Some(new Apple)), merge).
addEdge(FlowFrom[Apple](() Some(new Apple)), merge).
addEdge(merge, FlowFrom[Fruit].map(identity), out)
}
}
"build with implicits and variance" in {
PartialFlowGraph { implicit b
val inA = PublisherSource(PublisherProbe[Fruit]())
val inB = PublisherSource(PublisherProbe[Apple]())
val outA = SubscriberSink(SubscriberProbe[Fruit]())
val outB = SubscriberSink(SubscriberProbe[Fruit]())
val merge = Merge[Fruit]
import FlowGraphImplicits._
FlowFrom[Fruit](() Some(new Apple)) ~> merge
FlowFrom[Apple](() Some(new Apple)) ~> merge
inA ~> merge
inB ~> merge
inA ~> FlowFrom[Fruit].map(identity) ~> merge
inB ~> FlowFrom[Apple].map(identity) ~> merge
UndefinedSource[Apple] ~> merge
UndefinedSource[Apple] ~> FlowFrom[Fruit].map(identity) ~> merge
UndefinedSource[Apple] ~> FlowFrom[Apple].map(identity) ~> merge
merge ~> FlowFrom[Fruit].map(identity) ~> outA
FlowFrom[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> merge
FlowFrom[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> outB
FlowFrom[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedSink[Fruit]
inB ~> Broadcast[Apple] ~> merge
"UndefinedSource[Fruit] ~> FlowFrom[Apple].map(identity) ~> merge" shouldNot compile
"UndefinedSource[Fruit] ~> Broadcast[Apple]" shouldNot compile
"merge ~> Broadcast[Apple]" shouldNot compile
"merge ~> FlowFrom[Fruit].map(identity) ~> Broadcast[Apple]" shouldNot compile
"inB ~> merge ~> Broadcast[Apple]" shouldNot compile
"inA ~> Broadcast[Apple]" shouldNot compile
}
}
}
}

View file

@ -16,7 +16,7 @@ import akka.stream.impl2.Ast
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
* this marker interface. Edges may end at a `JunctionInPort`.
*/
sealed trait JunctionInPort[T] {
sealed trait JunctionInPort[-T] {
private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
private[akka] def vertex: FlowGraphInternal.Vertex
type NextT
@ -27,7 +27,7 @@ sealed trait JunctionInPort[T] {
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
* this marker interface. Edges may start at a `JunctionOutPort`.
*/
sealed trait JunctionOutPort[T] {
sealed trait JunctionOutPort[+T] {
private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
private[akka] def vertex: FlowGraphInternal.Vertex
}
@ -226,7 +226,7 @@ object UndefinedSink {
* yet by using this placeholder instead of the real [[Sink]]. Later the placeholder can
* be replaced with [[FlowGraphBuilder#attachSink]].
*/
final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 0
@ -256,7 +256,7 @@ object UndefinedSource {
* yet by using this placeholder instead of the real [[Source]]. Later the placeholder can
* be replaced with [[FlowGraphBuilder#attachSource]].
*/
final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class UndefinedSource[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 0
override def maximumInputCount: Int = 0
override def minimumOutputCount: Int = 1