diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 3337f88748..dc190188a1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -224,6 +224,38 @@ class FlowGraphCompileSpec extends AkkaSpec { } } + "build with variance when indices are not specified" in { + FlowGraph.closed() { implicit b ⇒ + import FlowGraph.Implicits._ + val fruitMerge = b.add(Merge[Fruit](2)) + Source[Fruit](apples) ~> fruitMerge + Source[Apple](apples) ~> fruitMerge + fruitMerge ~> Sink.head[Fruit] + "fruitMerge ~> Sink.head[Apple]" shouldNot compile + + val appleMerge = b.add(Merge[Apple](1)) + "Source[Fruit](apples) ~> appleMerge" shouldNot compile + Source[Apple](apples) ~> appleMerge + appleMerge ~> Sink.head[Fruit] + + val appleMerge2 = b.add(Merge[Apple](1)) + Source[Apple](apples) ~> appleMerge2 + appleMerge2 ~> Sink.head[Apple] + + val fruitBcast = b.add(Broadcast[Fruit](1)) + Source[Fruit](apples) ~> fruitBcast + //Source[Apple](apples) ~> fruitBcast // FIXME: should compile #16997 + fruitBcast ~> Sink.head[Fruit] + "fruitBcast ~> Sink.head[Apple]" shouldNot compile + + val appleBcast = b.add(Broadcast[Apple](2)) + "Source[Fruit](apples) ~> appleBcast" shouldNot compile + Source[Apple](apples) ~> appleBcast + appleBcast ~> Sink.head[Fruit] + appleBcast ~> Sink.head[Apple] + } + } + "build with implicits and variance" in { FlowGraph.closed() { implicit b ⇒ def appleSource = b.add(Source(TestPublisher.manualProbe[Apple])) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index ef8ab3a806..e7af764339 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -125,7 +125,7 @@ object GraphFlexiMergeSpec { def createMergeLogic(p: PortT) = new MergeLogic[String] { var throwFromOnComplete = false - override def initialState = State(ReadAny(p.inArray: _*)) { + override def initialState = State(ReadAny(p.inSeq: _*)) { (ctx, input, element) ⇒ if (element == "cancel") ctx.cancel(input) diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template index 7169a1d466..21b8cf70c4 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template @@ -14,25 +14,25 @@ object FanInShape { abstract class FanInShape[O](init: FanInShape.Init[O]) extends Shape { import FanInShape._ - + final private[this] val (_out, _registered, _name) = init match { case Name(name) => (new Outlet[O](s"$name.out"), Nil.iterator, name) case Ports(o, it) => (o, it.iterator, "FanIn") } - + final def out: Outlet[O] = _out final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil final override def inlets: immutable.Seq[Inlet[_]] = _inlets - + private var _inlets: Vector[Inlet[_]] = Vector.empty protected def newInlet[T](name: String): Inlet[T] = { val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else new Inlet[T](s"${_name}.$name") _inlets :+= p p } - + protected def construct(init: Init[O]): FanInShape[O] - + def deepCopy(): FanInShape[O] = construct(Ports[O](new Outlet(_out.toString), inlets.map(i => new Inlet(i.toString)))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = { require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanInShape") @@ -46,15 +46,15 @@ object UniformFanInShape { new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList)) } -class UniformFanInShape[T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { +class UniformFanInShape[-T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { def this(n: Int) = this(n, FanInShape.Name("UniformFanIn")) def this(n: Int, name: String) = this(n, FanInShape.Name(name)) def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList)) override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new UniformFanInShape(n, init) override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]] - - val inArray: Array[Inlet[T]] = Array.tabulate(n)(i => newInlet[T](s"in$i")) - def in(n: Int): Inlet[T] = inArray(n) + + val inSeq: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(n)(i => newInlet[T](s"in$i")) + def in(n: Int): Inlet[T] = inSeq(n) } class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { @@ -77,7 +77,7 @@ class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends Fan def this([#in0: Inlet[T0]#], out: Outlet[O]) = this(FanInShape.Ports(out, [#in0# :: ] :: Nil)) override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1(init) override def deepCopy(): FanInShape1[[#T0#], O] = super.deepCopy().asInstanceOf[FanInShape1[[#T0#], O]] - + [#val in0 = newInlet[T0]("in0")# ] }# diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 8dbce9c9d9..daf87ae023 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -132,18 +132,18 @@ private[akka] case class ActorFlowMaterializerImpl( val (props, inputs, output) = fanin match { case MergeModule(shape, _) ⇒ - (FairMerge.props(effectiveSettings, shape.inArray.size), shape.inArray.toSeq, shape.out) + (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out) case f: FlexiMergeModule[t, p] ⇒ val flexi = f.flexi(f.shape) (FlexiMerge.props(effectiveSettings, f.shape, flexi), f.shape.inlets, f.shape.outlets.head) case MergePreferredModule(shape, _) ⇒ - (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out) + (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out) case ConcatModule(shape, _) ⇒ - require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO - (Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out) + require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO + (Concat.props(effectiveSettings), shape.inSeq, shape.out) case zip: ZipWithModule ⇒ (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 7d55055785..c509649609 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -452,7 +452,7 @@ object FlowGraph extends GraphApply { @tailrec private[stream] def findIn[I, O](b: Builder[_], junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { - if (n == junction.inArray.length) + if (n == junction.inSeq.length) throw new IllegalArgumentException(s"no more inlets free on $junction") else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1) else junction.in(n) @@ -473,7 +473,7 @@ object FlowGraph extends GraphApply { def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = { def bind(n: Int): Unit = { - if (n == junction.inArray.length) + if (n == junction.inSeq.length) throw new IllegalArgumentException(s"no more inlets free on $junction") else if (b.module.upstreams.contains(junction.in(n))) bind(n + 1) else b.addEdge(importAndGetPort(b), junction.in(n))