diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index 54356b41d0..d470f268d7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -22,7 +22,7 @@ class FlowCompileSpec extends AkkaSpec { implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system)) "Flow" should { - "should not run" in { + "not run" in { val open: Flow[Int, Int, _] = Flow[Int] "open.run()" shouldNot compile } @@ -52,7 +52,7 @@ class FlowCompileSpec extends AkkaSpec { val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.asPublisher[Int](false)) val appended: Sink[Int, _] = open.to(closedSink) "appended.run()" shouldNot compile - "appended.connect(Sink.head[Int])" shouldNot compile + "appended.to(Sink.head[Int])" shouldNot compile intSeq.to(appended).run } "be appended to Source" in { @@ -60,22 +60,22 @@ class FlowCompileSpec extends AkkaSpec { val closedSource: Source[Int, _] = strSeq.via(Flow[String].map(_.hashCode)) val closedSource2: Source[String, _] = closedSource.via(open) "closedSource2.run()" shouldNot compile - "strSeq.connect(closedSource2)" shouldNot compile + "strSeq.to(closedSource2)" shouldNot compile closedSource2.to(Sink.asPublisher[String](false)).run } } "Sink" should { - val openSource: Sink[Int, _] = + val openSink: Sink[Int, _] = Flow[Int].map(_.toString).to(Sink.asPublisher[String](false)) "accept Source" in { - intSeq.to(openSource) + intSeq.to(openSink) } "not accept Sink" in { - "openSource.connect(Sink.head[String])" shouldNot compile + "openSink.to(Sink.head[String])" shouldNot compile } "not run()" in { - "openSource.run()" shouldNot compile + "openSink.run()" shouldNot compile } } @@ -86,7 +86,7 @@ class FlowCompileSpec extends AkkaSpec { openSource.to(Sink.asPublisher[String](false)) } "not be accepted by Source" in { - "openSource.connect(intSeq)" shouldNot compile + "openSource.to(intSeq)" shouldNot compile } "not run()" in { "openSource.run()" shouldNot compile @@ -101,11 +101,29 @@ class FlowCompileSpec extends AkkaSpec { closed.run() } "not be accepted by Source" in { - "intSeq.connect(closed)" shouldNot compile + "intSeq.to(closed)" shouldNot compile } "not accept Sink" in { - "closed.connect(Sink.head[String])" shouldNot compile + "closed.to(Sink.head[String])" shouldNot compile + } + } + + "FlowOps" should { + "be extensible" in { + val f: FlowOps[Int, Unit] { type Closed = Sink[Int, Unit] } = Flow[Int] + val fm = f.map(identity) + val f2: FlowOps[Int, Unit] = fm + val s: Sink[Int, Unit] = fm.to(Sink.ignore) + } + + "be extensible (with MaterializedValue)" in { + val f: FlowOpsMat[Int, Unit] { type ClosedMat[+M] = Sink[Int, M] } = Flow[Int] + val fm = f.map(identity).concatMat(Source.empty)(Keep.both) + // this asserts only the FlowOpsMat part of the signature, but fm also carries the + // CloseMat type without which `.to(sink)` does not work + val f2: FlowOpsMat[Int, (Unit, Unit)] = fm + val s: Sink[Int, (Unit, Unit)] = fm.to(Sink.ignore) } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index 9dbbbdd849..3dd63c0d4c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -5,7 +5,6 @@ package akka.stream.impl import akka.stream._ import akka.stream.scaladsl._ - import language.higherKinds object SubFlowImpl { @@ -37,5 +36,4 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit], override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth) def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink)) - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7da680c159..d9d4aaa582 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -349,20 +349,14 @@ trait FlowOps[+Out, +Mat] { import akka.stream.impl.Stages._ import GraphDSL.Implicits._ - type Repr[+O] <: FlowOps[O, Mat] + type Repr[+O] <: FlowOps[O, Mat] { + type Repr[+OO] = FlowOps.this.Repr[OO] + type Closed = FlowOps.this.Closed + } // result of closing a Source is RunnableGraph, closing a Flow is Sink type Closed - /* - * Repr is actually self-bounded, but that would be a cyclic type declaration that is illegal in Scala. - * Therefore we need to help the compiler by specifying that Repr expressions can be flattened. - */ - import language.implicitConversions - private implicit def reprFlatten0[O1](r: Repr[O1]#Closed): Closed = r.asInstanceOf[Closed] - private implicit def reprFlatten1[O1, O2](r: Repr[O1]#Repr[O2]): Repr[O2] = r.asInstanceOf[Repr[O2]] - private implicit def reprFlatten2[O1, O2, O3](r: Repr[O1]#Repr[O2]#Repr[O3]): Repr[O3] = r.asInstanceOf[Repr[O3]] - /** * Transform this [[Flow]] by appending the given processing steps. * {{{ @@ -1634,7 +1628,18 @@ trait FlowOps[+Out, +Mat] { */ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { - type ReprMat[+O, +M] <: FlowOpsMat[O, M] + type Repr[+O] <: ReprMat[O, Mat] { + type Repr[+OO] = FlowOpsMat.this.Repr[OO] + type ReprMat[+OO, +MM] = FlowOpsMat.this.ReprMat[OO, MM] + type Closed = FlowOpsMat.this.Closed + type ClosedMat[+M] = FlowOpsMat.this.ClosedMat[M] + } + type ReprMat[+O, +M] <: FlowOpsMat[O, M] { + type Repr[+OO] = FlowOpsMat.this.ReprMat[OO, M @uncheckedVariance] + type ReprMat[+OO, +MM] = FlowOpsMat.this.ReprMat[OO, MM] + type Closed = FlowOpsMat.this.ClosedMat[M @uncheckedVariance] + type ClosedMat[+MM] = FlowOpsMat.this.ClosedMat[MM] + } type ClosedMat[+M] <: Graph[_, M] /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 1a2d07037c..84a3ad9478 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -922,10 +922,9 @@ object GraphDSL extends GraphApply { def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed = super.~>(sink)(b) - } - private class DisabledPortOps[Out, Mat](msg: String) extends PortOpsImpl[Out](null, null) { + private class DisabledPortOps[Out](msg: String) extends PortOpsImpl[Out](null, null) { override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg) override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =