From bd352df0b2210dd4b7e9cae1d60d97d9218081da Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 30 Sep 2014 15:40:08 +0200 Subject: [PATCH] +str #15960 Correction of FlowGraph variance --- .../scaladsl2/FlowGraphCompileSpec.scala | 52 +++++++++++++++++++ .../akka/stream/scaladsl2/FlowGraph.scala | 8 +-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 6c4e8e1cc4..1018e3d11c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -6,8 +6,16 @@ package akka.stream.scaladsl2 import akka.stream.testkit.AkkaSpec import akka.stream.Transformer import akka.stream.OverflowStrategy +import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.testkit.StreamTestKit.PublisherProbe + +object FlowGraphCompileSpec { + class Fruit + class Apple extends Fruit +} class FlowGraphCompileSpec extends AkkaSpec { + import FlowGraphCompileSpec._ implicit val mat = FlowMaterializer() @@ -273,5 +281,49 @@ class FlowGraphCompileSpec extends AkkaSpec { }.getMessage should include("at most 1 outgoing") } + "build with variance" in { + val out = SubscriberSink(SubscriberProbe[Fruit]()) + FlowGraph { b ⇒ + val merge = Merge[Fruit] + b. + addEdge(FlowFrom[Fruit](() ⇒ Some(new Apple)), merge). + addEdge(FlowFrom[Apple](() ⇒ Some(new Apple)), merge). + addEdge(merge, FlowFrom[Fruit].map(identity), out) + } + } + + "build with implicits and variance" in { + PartialFlowGraph { implicit b ⇒ + val inA = PublisherSource(PublisherProbe[Fruit]()) + val inB = PublisherSource(PublisherProbe[Apple]()) + val outA = SubscriberSink(SubscriberProbe[Fruit]()) + val outB = SubscriberSink(SubscriberProbe[Fruit]()) + val merge = Merge[Fruit] + import FlowGraphImplicits._ + FlowFrom[Fruit](() ⇒ Some(new Apple)) ~> merge + FlowFrom[Apple](() ⇒ Some(new Apple)) ~> merge + inA ~> merge + inB ~> merge + inA ~> FlowFrom[Fruit].map(identity) ~> merge + inB ~> FlowFrom[Apple].map(identity) ~> merge + UndefinedSource[Apple] ~> merge + UndefinedSource[Apple] ~> FlowFrom[Fruit].map(identity) ~> merge + UndefinedSource[Apple] ~> FlowFrom[Apple].map(identity) ~> merge + merge ~> FlowFrom[Fruit].map(identity) ~> outA + + FlowFrom[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> merge + FlowFrom[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> outB + FlowFrom[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedSink[Fruit] + inB ~> Broadcast[Apple] ~> merge + + "UndefinedSource[Fruit] ~> FlowFrom[Apple].map(identity) ~> merge" shouldNot compile + "UndefinedSource[Fruit] ~> Broadcast[Apple]" shouldNot compile + "merge ~> Broadcast[Apple]" shouldNot compile + "merge ~> FlowFrom[Fruit].map(identity) ~> Broadcast[Apple]" shouldNot compile + "inB ~> merge ~> Broadcast[Apple]" shouldNot compile + "inA ~> Broadcast[Apple]" shouldNot compile + } + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 90bdeb1d55..ba901bec8f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -16,7 +16,7 @@ import akka.stream.impl2.Ast * Fan-in and fan-out vertices in the [[FlowGraph]] implements * this marker interface. Edges may end at a `JunctionInPort`. */ -sealed trait JunctionInPort[T] { +sealed trait JunctionInPort[-T] { private[akka] def port: Int = FlowGraphInternal.UnlabeledPort private[akka] def vertex: FlowGraphInternal.Vertex type NextT @@ -27,7 +27,7 @@ sealed trait JunctionInPort[T] { * Fan-in and fan-out vertices in the [[FlowGraph]] implements * this marker interface. Edges may start at a `JunctionOutPort`. */ -sealed trait JunctionOutPort[T] { +sealed trait JunctionOutPort[+T] { private[akka] def port: Int = FlowGraphInternal.UnlabeledPort private[akka] def vertex: FlowGraphInternal.Vertex } @@ -226,7 +226,7 @@ object UndefinedSink { * yet by using this placeholder instead of the real [[Sink]]. Later the placeholder can * be replaced with [[FlowGraphBuilder#attachSink]]. */ -final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { override def minimumInputCount: Int = 1 override def maximumInputCount: Int = 1 override def minimumOutputCount: Int = 0 @@ -256,7 +256,7 @@ object UndefinedSource { * yet by using this placeholder instead of the real [[Source]]. Later the placeholder can * be replaced with [[FlowGraphBuilder#attachSource]]. */ -final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class UndefinedSource[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { override def minimumInputCount: Int = 0 override def maximumInputCount: Int = 0 override def minimumOutputCount: Int = 1