=str #16997 added variance annotation to UniformFanInShape

This commit is contained in:
Martynas Mickevičius 2015-06-02 14:47:22 +03:00
parent ce68659473
commit 2e32e3a744
5 changed files with 49 additions and 17 deletions

View file

@ -224,6 +224,38 @@ class FlowGraphCompileSpec extends AkkaSpec {
}
}
"build with variance when indices are not specified" in {
FlowGraph.closed() { implicit b
import FlowGraph.Implicits._
val fruitMerge = b.add(Merge[Fruit](2))
Source[Fruit](apples) ~> fruitMerge
Source[Apple](apples) ~> fruitMerge
fruitMerge ~> Sink.head[Fruit]
"fruitMerge ~> Sink.head[Apple]" shouldNot compile
val appleMerge = b.add(Merge[Apple](1))
"Source[Fruit](apples) ~> appleMerge" shouldNot compile
Source[Apple](apples) ~> appleMerge
appleMerge ~> Sink.head[Fruit]
val appleMerge2 = b.add(Merge[Apple](1))
Source[Apple](apples) ~> appleMerge2
appleMerge2 ~> Sink.head[Apple]
val fruitBcast = b.add(Broadcast[Fruit](1))
Source[Fruit](apples) ~> fruitBcast
//Source[Apple](apples) ~> fruitBcast // FIXME: should compile #16997
fruitBcast ~> Sink.head[Fruit]
"fruitBcast ~> Sink.head[Apple]" shouldNot compile
val appleBcast = b.add(Broadcast[Apple](2))
"Source[Fruit](apples) ~> appleBcast" shouldNot compile
Source[Apple](apples) ~> appleBcast
appleBcast ~> Sink.head[Fruit]
appleBcast ~> Sink.head[Apple]
}
}
"build with implicits and variance" in {
FlowGraph.closed() { implicit b
def appleSource = b.add(Source(TestPublisher.manualProbe[Apple]))

View file

@ -125,7 +125,7 @@ object GraphFlexiMergeSpec {
def createMergeLogic(p: PortT) = new MergeLogic[String] {
var throwFromOnComplete = false
override def initialState = State(ReadAny(p.inArray: _*)) {
override def initialState = State(ReadAny(p.inSeq: _*)) {
(ctx, input, element)
if (element == "cancel")
ctx.cancel(input)

View file

@ -14,25 +14,25 @@ object FanInShape {
abstract class FanInShape[O](init: FanInShape.Init[O]) extends Shape {
import FanInShape._
final private[this] val (_out, _registered, _name) = init match {
case Name(name) => (new Outlet[O](s"$name.out"), Nil.iterator, name)
case Ports(o, it) => (o, it.iterator, "FanIn")
}
final def out: Outlet[O] = _out
final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil
final override def inlets: immutable.Seq[Inlet[_]] = _inlets
private var _inlets: Vector[Inlet[_]] = Vector.empty
protected def newInlet[T](name: String): Inlet[T] = {
val p = if (_registered.hasNext) _registered.next().asInstanceOf[Inlet[T]] else new Inlet[T](s"${_name}.$name")
_inlets :+= p
p
}
protected def construct(init: Init[O]): FanInShape[O]
def deepCopy(): FanInShape[O] = construct(Ports[O](new Outlet(_out.toString), inlets.map(i => new Inlet(i.toString))))
final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = {
require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanInShape")
@ -46,15 +46,15 @@ object UniformFanInShape {
new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList))
}
class UniformFanInShape[T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
class UniformFanInShape[-T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(n: Int) = this(n, FanInShape.Name("UniformFanIn"))
def this(n: Int, name: String) = this(n, FanInShape.Name(name))
def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList))
override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new UniformFanInShape(n, init)
override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]]
val inArray: Array[Inlet[T]] = Array.tabulate(n)(i => newInlet[T](s"in$i"))
def in(n: Int): Inlet[T] = inArray(n)
val inSeq: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(n)(i => newInlet[T](s"in$i"))
def in(n: Int): Inlet[T] = inSeq(n)
}
class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
@ -77,7 +77,7 @@ class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends Fan
def this([#in0: Inlet[T0]#], out: Outlet[O]) = this(FanInShape.Ports(out, [#in0# :: ] :: Nil))
override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1(init)
override def deepCopy(): FanInShape1[[#T0#], O] = super.deepCopy().asInstanceOf[FanInShape1[[#T0#], O]]
[#val in0 = newInlet[T0]("in0")#
]
}#

View file

@ -132,18 +132,18 @@ private[akka] case class ActorFlowMaterializerImpl(
val (props, inputs, output) = fanin match {
case MergeModule(shape, _)
(FairMerge.props(effectiveSettings, shape.inArray.size), shape.inArray.toSeq, shape.out)
(FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out)
case f: FlexiMergeModule[t, p]
val flexi = f.flexi(f.shape)
(FlexiMerge.props(effectiveSettings, f.shape, flexi), f.shape.inlets, f.shape.outlets.head)
case MergePreferredModule(shape, _)
(UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out)
(UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out)
case ConcatModule(shape, _)
require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
(Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out)
require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
(Concat.props(effectiveSettings), shape.inSeq, shape.out)
case zip: ZipWithModule
(zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)

View file

@ -452,7 +452,7 @@ object FlowGraph extends GraphApply {
@tailrec
private[stream] def findIn[I, O](b: Builder[_], junction: UniformFanInShape[I, O], n: Int): Inlet[I] = {
if (n == junction.inArray.length)
if (n == junction.inSeq.length)
throw new IllegalArgumentException(s"no more inlets free on $junction")
else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1)
else junction.in(n)
@ -473,7 +473,7 @@ object FlowGraph extends GraphApply {
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = {
def bind(n: Int): Unit = {
if (n == junction.inArray.length)
if (n == junction.inSeq.length)
throw new IllegalArgumentException(s"no more inlets free on $junction")
else if (b.module.upstreams.contains(junction.in(n))) bind(n + 1)
else b.addEdge(importAndGetPort(b), junction.in(n))