make FlowOps(Mat).Repr(Mat) safe and precise

This commit is contained in:
Roland Kuhn 2015-12-30 21:15:04 +01:00
parent cf1a5611a8
commit ec3858e893
4 changed files with 45 additions and 25 deletions

View file

@ -22,7 +22,7 @@ class FlowCompileSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system))
"Flow" should {
"should not run" in {
"not run" in {
val open: Flow[Int, Int, _] = Flow[Int]
"open.run()" shouldNot compile
}
@ -52,7 +52,7 @@ class FlowCompileSpec extends AkkaSpec {
val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.asPublisher[Int](false))
val appended: Sink[Int, _] = open.to(closedSink)
"appended.run()" shouldNot compile
"appended.connect(Sink.head[Int])" shouldNot compile
"appended.to(Sink.head[Int])" shouldNot compile
intSeq.to(appended).run
}
"be appended to Source" in {
@ -60,22 +60,22 @@ class FlowCompileSpec extends AkkaSpec {
val closedSource: Source[Int, _] = strSeq.via(Flow[String].map(_.hashCode))
val closedSource2: Source[String, _] = closedSource.via(open)
"closedSource2.run()" shouldNot compile
"strSeq.connect(closedSource2)" shouldNot compile
"strSeq.to(closedSource2)" shouldNot compile
closedSource2.to(Sink.asPublisher[String](false)).run
}
}
"Sink" should {
val openSource: Sink[Int, _] =
val openSink: Sink[Int, _] =
Flow[Int].map(_.toString).to(Sink.asPublisher[String](false))
"accept Source" in {
intSeq.to(openSource)
intSeq.to(openSink)
}
"not accept Sink" in {
"openSource.connect(Sink.head[String])" shouldNot compile
"openSink.to(Sink.head[String])" shouldNot compile
}
"not run()" in {
"openSource.run()" shouldNot compile
"openSink.run()" shouldNot compile
}
}
@ -86,7 +86,7 @@ class FlowCompileSpec extends AkkaSpec {
openSource.to(Sink.asPublisher[String](false))
}
"not be accepted by Source" in {
"openSource.connect(intSeq)" shouldNot compile
"openSource.to(intSeq)" shouldNot compile
}
"not run()" in {
"openSource.run()" shouldNot compile
@ -101,11 +101,29 @@ class FlowCompileSpec extends AkkaSpec {
closed.run()
}
"not be accepted by Source" in {
"intSeq.connect(closed)" shouldNot compile
"intSeq.to(closed)" shouldNot compile
}
"not accept Sink" in {
"closed.connect(Sink.head[String])" shouldNot compile
"closed.to(Sink.head[String])" shouldNot compile
}
}
"FlowOps" should {
"be extensible" in {
val f: FlowOps[Int, Unit] { type Closed = Sink[Int, Unit] } = Flow[Int]
val fm = f.map(identity)
val f2: FlowOps[Int, Unit] = fm
val s: Sink[Int, Unit] = fm.to(Sink.ignore)
}
"be extensible (with MaterializedValue)" in {
val f: FlowOpsMat[Int, Unit] { type ClosedMat[+M] = Sink[Int, M] } = Flow[Int]
val fm = f.map(identity).concatMat(Source.empty)(Keep.both)
// this asserts only the FlowOpsMat part of the signature, but fm also carries the
// CloseMat type without which `.to(sink)` does not work
val f2: FlowOpsMat[Int, (Unit, Unit)] = fm
val s: Sink[Int, (Unit, Unit)] = fm.to(Sink.ignore)
}
}
}

View file

@ -5,7 +5,6 @@ package akka.stream.impl
import akka.stream._
import akka.stream.scaladsl._
import language.higherKinds
object SubFlowImpl {
@ -37,5 +36,4 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit],
override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth)
def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink))
}

View file

@ -349,20 +349,14 @@ trait FlowOps[+Out, +Mat] {
import akka.stream.impl.Stages._
import GraphDSL.Implicits._
type Repr[+O] <: FlowOps[O, Mat]
type Repr[+O] <: FlowOps[O, Mat] {
type Repr[+OO] = FlowOps.this.Repr[OO]
type Closed = FlowOps.this.Closed
}
// result of closing a Source is RunnableGraph, closing a Flow is Sink
type Closed
/*
* Repr is actually self-bounded, but that would be a cyclic type declaration that is illegal in Scala.
* Therefore we need to help the compiler by specifying that Repr expressions can be flattened.
*/
import language.implicitConversions
private implicit def reprFlatten0[O1](r: Repr[O1]#Closed): Closed = r.asInstanceOf[Closed]
private implicit def reprFlatten1[O1, O2](r: Repr[O1]#Repr[O2]): Repr[O2] = r.asInstanceOf[Repr[O2]]
private implicit def reprFlatten2[O1, O2, O3](r: Repr[O1]#Repr[O2]#Repr[O3]): Repr[O3] = r.asInstanceOf[Repr[O3]]
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
@ -1634,7 +1628,18 @@ trait FlowOps[+Out, +Mat] {
*/
trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
type ReprMat[+O, +M] <: FlowOpsMat[O, M]
type Repr[+O] <: ReprMat[O, Mat] {
type Repr[+OO] = FlowOpsMat.this.Repr[OO]
type ReprMat[+OO, +MM] = FlowOpsMat.this.ReprMat[OO, MM]
type Closed = FlowOpsMat.this.Closed
type ClosedMat[+M] = FlowOpsMat.this.ClosedMat[M]
}
type ReprMat[+O, +M] <: FlowOpsMat[O, M] {
type Repr[+OO] = FlowOpsMat.this.ReprMat[OO, M @uncheckedVariance]
type ReprMat[+OO, +MM] = FlowOpsMat.this.ReprMat[OO, MM]
type Closed = FlowOpsMat.this.ClosedMat[M @uncheckedVariance]
type ClosedMat[+MM] = FlowOpsMat.this.ClosedMat[MM]
}
type ClosedMat[+M] <: Graph[_, M]
/**

View file

@ -922,10 +922,9 @@ object GraphDSL extends GraphApply {
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed =
super.~>(sink)(b)
}
private class DisabledPortOps[Out, Mat](msg: String) extends PortOpsImpl[Out](null, null) {
private class DisabledPortOps[Out](msg: String) extends PortOpsImpl[Out](null, null) {
override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg)
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =