From 7b232600d8596a7585785076b394c06336015dae Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Mon, 18 Aug 2014 11:28:30 +0300 Subject: [PATCH 1/4] !str #15672 shallow shim of the new Streams DSL * mostly Flow API * initial Graph API * positive and negative compilation tests --- .../src/main/scala/akka/stream/dsl/Flow.scala | 147 +++++++++++++++++ .../main/scala/akka/stream/dsl/Graph.scala | 21 +++ .../test/scala/akka/stream/dsl/FlowSpec.scala | 149 ++++++++++++++++++ .../scala/akka/stream/dsl/GraphSpec.scala | 37 +++++ 4 files changed, 354 insertions(+) create mode 100644 akka-stream/src/main/scala/akka/stream/dsl/Flow.scala create mode 100644 akka-stream/src/main/scala/akka/stream/dsl/Graph.scala create mode 100644 akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala new file mode 100644 index 0000000000..7f731baf63 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala @@ -0,0 +1,147 @@ +package akka.stream.dsl + +import scala.collection.immutable.Iterable +import scala.concurrent.Future + +trait Flow[-In, +Out] { + val transform: Transform[In, Out] +} + +object From { + /** + * Helper to create Flow without Input. + * Example usage: From[Int] + */ + def apply[T]: OpenFlow[T, T] = OpenFlow[T, T](EmptyTransform[T, T]()) + + /** + * Helper to create Flow with Input from Iterable. + * Example usage: Flow(Seq(1,2,3)) + */ + def apply[T](i: Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i)) + + /** + * Helper to create Flow with Input from Future. + * Example usage: Flow(Future { 1 }) + */ + def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f)) +} + +trait Input[-In] + +/** + * Input from Iterable + * + * Changing In from Contravariant to Covariant is needed because Iterable[+A]. + * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] + */ +case class IterableIn[-In](i: Iterable[_ >: In]) extends Input[In] + +/** + * Input from Future + * + * Changing In from Contravariant to Covariant is needed because Future[+A]. + * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] + */ +case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] + +trait Output[+Out] + +case class FutureOut[+Out]() extends Output[Out] +case class PublisherOut[+Out]() extends Output[Out] + +/** + * Operations with a Flow which has open (no attached) Input. + * + * No Out type parameter would be useful for Graph signatures, but we need it here + * for `withInput` and `prependTransform` methods. + */ +sealed trait HasOpenInput[-In, +Out] extends Flow[In, Out] { + type Repr[-I, +O] <: HasOpenInput[I, O] + type AfterCloseInput[-In, +Out] <: Flow[In, Out] + + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] + + // linear combinators with flows + def prepend[T](f: OpenFlow[T, In]) = + prependTransform(f.transform) + def prependClosed[T](f: OpenOutputFlow[T, In]) = + prependTransform(f.transform).withInput(f.input) +} + +/** + * Operations with a Flow which has open (no attached) Output. + * + * No In type parameter would be useful for Graph signatures, but we need it here + * for `withOutput` and `appendTransform` methods. + */ +trait HasOpenOutput[-In, +Out] extends Flow[In, Out] { + type Repr[-I, +O] <: HasOpenOutput[I, O] + type AfterCloseOutput[-In, +Out] <: Flow[In, Out] + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] + protected def appendTransform[I <: In, T](t: Transform[I, T]): Repr[I, T] + + // linear simple combinators + def map[T](f: Out ⇒ T) = appendTransform(transform ++ EmptyTransform[Out, T]()) + def filter(p: Out ⇒ Boolean) = appendTransform(transform ++ EmptyTransform[Out, Out]()) + + // linear combinator which produce multiple flows (is this still linear? move to graph?) + def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, OpenOutputFlow[O, O])] = + appendTransform(transform ++ EmptyTransform[Out, (K, OpenOutputFlow[O, O])]()) + + // linear combinators with flows + def append[T](f: OpenFlow[Out, T]) = + appendTransform(transform ++ f.transform) + def appendClosed[T](f: OpenInputFlow[Out, T]) = + appendTransform(transform ++ f.transform).withOutput(f.output) + + // terminal combinators + def ToFuture = withOutput(FutureOut()) + def ToPublisher = withOutput(PublisherOut()) +} + +case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { + override type Repr[-I, +O] = OpenFlow[I, O] + type AfterCloseOutput[-I, +O] = OpenInputFlow[I, O] + type AfterCloseInput[-I, +O] = OpenOutputFlow[I, O] + + def withOutput[O >: Out](out: Output[O]) = OpenInputFlow(out, transform) + def withInput[I <: In](in: Input[I]) = OpenOutputFlow(in, transform) + + protected def prependTransform[T](t: Transform[T, In]) = OpenFlow(t ++ transform) + protected def appendTransform[I <: In, T](t: Transform[I, T]) = OpenFlow(t) +} + +case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends HasOpenInput[In, Out] { + type Repr[-I, +O] = OpenInputFlow[I, O] + type AfterCloseInput[-I, +O] = ClosedFlow[I, O] + + def withoutOutput = OpenFlow(transform) + def withInput[I <: In](in: Input[I]) = ClosedFlow(in, output, transform) + + protected def prependTransform[T](t: Transform[T, In]) = OpenInputFlow(output, t ++ transform) +} + +case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends HasOpenOutput[In, Out] { + override type Repr[-I, +O] = OpenOutputFlow[I, O] + type AfterCloseOutput[-I, +O] = ClosedFlow[I, O] + + def withOutput[O >: Out](out: Output[O]) = ClosedFlow(input, out, transform) + def withoutInput = OpenFlow(transform) + + protected def appendTransform[I <: In, T](t: Transform[I, T]) = OpenOutputFlow(input, t) +} + +case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { + def withoutOutput = OpenOutputFlow(input, transform) + def withoutInput = OpenInputFlow(output, transform) + + def run(): Unit = () +} + +trait Transform[-In, +Out] { + def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() +} +case class EmptyTransform[-In, +Out]() extends Transform[In, Out] diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala new file mode 100644 index 0000000000..02ea979fef --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala @@ -0,0 +1,21 @@ +package akka.stream.dsl + +/** + * Dummy implementation needed for runtime tests. + */ +trait Graph { + def broadcast[T](source: HasOpenOutput[_, T], destinations: Seq[HasOpenInput[T, _]]) = () + def zip[T, U](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, U], destination: HasOpenInput[(T, U), _]) = () + + /** + * Whenever a new K value is returned from `f`, a new `dest` flow will be materialized. + * + * The discriminating value `K` of the output flow can be known by applying `f` on any + * element in the output flows. + */ + def groupBy[T, K](source: HasOpenOutput[_, T], f: T ⇒ K, dest: HasOpenInput[T, _]) = () +} + +object Graph { + def apply(): Graph = new Graph {} +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala new file mode 100644 index 0000000000..a01efdb55b --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala @@ -0,0 +1,149 @@ +package akka.stream.dsl + +import org.scalatest.{ Matchers, WordSpec } + +import scala.collection.immutable.Seq +import scala.concurrent.Future + +class FlowSpec extends WordSpec with Matchers { + + val intSeq = IterableIn(Seq(1, 2, 3)) + val strSeq = IterableIn(Seq("a", "b", "c")) + + import scala.concurrent.ExecutionContext.Implicits.global + val intFut = FutureIn(Future { 3 }) + + "OpenFlow" should { + "go through all states" in { + val f = From[Int] + .withInput(intSeq) + .withOutput(FutureOut()) + .withoutInput + .withoutOutput + } + "should not run" in { + val open = From[Int] + "open.run" shouldNot compile + } + "accept IterableIn" in { + val f = From[Int].withInput(intSeq) + } + "accept FutureIn" in { + val f = From[Int].withInput(intFut) + } + "append OpenFlow" in { + val open1 = From[Int].map(_.toString) + val open2 = From[String].map(_.hashCode) + val open3 = open1.append(open2) + "open3.run" shouldNot compile + + val closedInput = open3.withInput(intSeq) + "closedInput.run" shouldNot compile + + val closedOutput = open3.ToFuture + "closedOutput.run" shouldNot compile + + closedInput.ToFuture.run + closedOutput.withInput(intSeq).run + } + "prepend OpenFlow" in { + val open1 = From[Int].map(_.toString) + val open2 = From[String].map(_.hashCode) + val open3 = open1.prepend(open2) + "open3.run" shouldNot compile + + val closedInput = open3.withInput(strSeq) + "closedInput.run" shouldNot compile + + val closedOutput = open3.ToFuture + "closedOutput.run" shouldNot compile + + closedInput.ToFuture.run + closedOutput.withInput(strSeq).run + } + "append OpenInputFlow" in { + val open = From[Int].map(_.toString) + val closed = From[String].map(_.hashCode).ToFuture + val appended = open.appendClosed(closed) + "appended.run" shouldNot compile + "appended.toFuture" shouldNot compile + appended.withInput(intSeq).run + } + "prepend OpenOutputFlow" in { + val open = From[Int].map(_.toString) + val closed = From[String].map(_.hashCode).withInput(strSeq) + val appended = open.prependClosed(closed) + "appended.run" shouldNot compile + "appended.withInput(strSeq)" shouldNot compile + appended.ToFuture.run + } + "groupBy" in { + val grouped = From(Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o % 3) + + val closedInner = grouped.map { + case (key, openFlow) ⇒ (key, openFlow.ToFuture) + } + + // both of these compile, even if `grouped` has inner flows unclosed + grouped.ToFuture.run + closedInner.ToFuture.run + } + } + + "OpenInputFlow" should { + val openInput = From[Int].map(_.toString).ToFuture + "accept Input" in { + openInput.withInput(intSeq) + } + "drop Output" in { + openInput.withoutOutput + } + "not drop Input" in { + "openInput.withoutInput" shouldNot compile + } + "not accept Output" in { + "openInput.ToFuture" shouldNot compile + } + "not run" in { + "openInput.run" shouldNot compile + } + } + + "OpenOutputFlow" should { + val openOutput = From(Seq(1, 2, 3)).map(_.toString) + "accept Output" in { + openOutput.ToFuture + } + "drop Input" in { + openOutput.withoutInput + } + "not drop Output" in { + "openOutput.withoutOutput" shouldNot compile + } + "not accept Input" in { + "openOutput.withInput(intSeq)" shouldNot compile + } + "not run" in { + "openOutput.run" shouldNot compile + } + } + + "ClosedFlow" should { + val closed = From(Seq(1, 2, 3)).map(_.toString).ToFuture + "run" in { + closed.run + } + "drop Input" in { + closed.withoutInput + } + "drop Output" in { + closed.withoutOutput + } + "not accept Input" in { + "closed.withInput(intSeq)" shouldNot compile + } + "not accept Output" in { + "closed.ToFuture" shouldNot compile + } + } +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala new file mode 100644 index 0000000000..9cf287aa8b --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala @@ -0,0 +1,37 @@ +package akka.stream.dsl + +import org.scalatest.{ WordSpec, Matchers } + +import scala.collection.immutable.Seq + +class GraphSpec extends WordSpec with Matchers { + + val intSeq = IterableIn(Seq(1, 2, 3)) + + "Graph" should { + "broadcast" in { + val in1 = From[Int].map(_ * 2) + val in2 = From[Int].map(_.toString) + val out1 = From[Int].map(_.toString) + val out2 = From[Int].filter(_ % 2 == 0) + + Graph().broadcast(in1, Seq(out1, out2)) + "Graph().broadcast(in2, Seq(out1, out2))" shouldNot compile + } + "zip" in { + val in1 = From[Int] + val in2 = From[String] + val out1 = From[(Int, String)] + val out2 = From[(String, Int)] + + Graph().zip(in1, in2, out1) + "Graph().zip(in1, in2, out2)" shouldNot compile + } + "groupBy" in { + val in = From[Int] + val out = From[Int].map(_.toString) + + Graph().groupBy(in, (i: Int) ⇒ i % 2, out) + } + } +} From 6d165d28f9ab1bb291c5da7ae2438650803a00d2 Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Wed, 20 Aug 2014 09:06:46 +0300 Subject: [PATCH 2/4] !str #15672 added all combinators from current Flow API * combinator tests --- .../src/main/scala/akka/stream/dsl/Flow.scala | 136 +++++++++++------- .../main/scala/akka/stream/dsl/Graph.scala | 16 +-- .../akka/stream/dsl/CombinatorSpec.scala | 86 +++++++++++ .../test/scala/akka/stream/dsl/FlowSpec.scala | 72 +++++----- .../scala/akka/stream/dsl/GraphSpec.scala | 34 +++-- 5 files changed, 234 insertions(+), 110 deletions(-) create mode 100644 akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala index 7f731baf63..b4f1e690f6 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala @@ -1,9 +1,12 @@ package akka.stream.dsl -import scala.collection.immutable.Iterable +import scala.collection.immutable import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import akka.stream.OverflowStrategy +import akka.stream.FlattenStrategy -trait Flow[-In, +Out] { +sealed trait Flow[-In, +Out] { val transform: Transform[In, Out] } @@ -18,7 +21,7 @@ object From { * Helper to create Flow with Input from Iterable. * Example usage: Flow(Seq(1,2,3)) */ - def apply[T](i: Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i)) + def apply[T](i: immutable.Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i)) /** * Helper to create Flow with Input from Future. @@ -35,7 +38,7 @@ trait Input[-In] * Changing In from Contravariant to Covariant is needed because Iterable[+A]. * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] */ -case class IterableIn[-In](i: Iterable[_ >: In]) extends Input[In] +final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] /** * Input from Future @@ -43,12 +46,12 @@ case class IterableIn[-In](i: Iterable[_ >: In]) extends Input[In] * Changing In from Contravariant to Covariant is needed because Future[+A]. * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] */ -case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] +final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] trait Output[+Out] -case class FutureOut[+Out]() extends Output[Out] -case class PublisherOut[+Out]() extends Output[Out] +final case class FutureOut[+Out]() extends Output[Out] +final case class PublisherOut[+Out]() extends Output[Out] /** * Operations with a Flow which has open (no attached) Input. @@ -56,17 +59,17 @@ case class PublisherOut[+Out]() extends Output[Out] * No Out type parameter would be useful for Graph signatures, but we need it here * for `withInput` and `prependTransform` methods. */ -sealed trait HasOpenInput[-In, +Out] extends Flow[In, Out] { - type Repr[-I, +O] <: HasOpenInput[I, O] +sealed trait HasOpenInput[-In, +Out] { + type Repr[-In, +Out] <: HasOpenInput[In, Out] type AfterCloseInput[-In, +Out] <: Flow[In, Out] def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] // linear combinators with flows - def prepend[T](f: OpenFlow[T, In]) = + def prepend[T](f: OpenFlow[T, In]): Repr[T, Out] = prependTransform(f.transform) - def prependClosed[T](f: OpenOutputFlow[T, In]) = + def prepend[T](f: OpenOutputFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = prependTransform(f.transform).withInput(f.input) } @@ -76,67 +79,100 @@ sealed trait HasOpenInput[-In, +Out] extends Flow[In, Out] { * No In type parameter would be useful for Graph signatures, but we need it here * for `withOutput` and `appendTransform` methods. */ -trait HasOpenOutput[-In, +Out] extends Flow[In, Out] { - type Repr[-I, +O] <: HasOpenOutput[I, O] +trait HasOpenOutput[-In, +Out] { + type Repr[-In, +Out] <: HasOpenOutput[In, Out] type AfterCloseOutput[-In, +Out] <: Flow[In, Out] def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] - protected def appendTransform[I <: In, T](t: Transform[I, T]): Repr[I, T] + protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] // linear simple combinators - def map[T](f: Out ⇒ T) = appendTransform(transform ++ EmptyTransform[Out, T]()) - def filter(p: Out ⇒ Boolean) = appendTransform(transform ++ EmptyTransform[Out, Out]()) + def map[T](f: Out ⇒ T): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def filter(p: Out ⇒ Boolean): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def drop(n: Int): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def dropWithin(d: FiniteDuration): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def take(n: Int): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def takeWithin(d: FiniteDuration): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def grouped(n: Int): Repr[In, immutable.Seq[Out]] = + appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) + def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] = + appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) + def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = + appendTransform(EmptyTransform[Out, S]()) + def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] = + appendTransform(EmptyTransform[Out, O]()) + def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) - // linear combinator which produce multiple flows (is this still linear? move to graph?) + // linear combinators which produce multiple flows + def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], OpenOutputFlow[O, O])] = + appendTransform(EmptyTransform[Out, (immutable.Seq[O], OpenOutputFlow[O, O])]()) def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, OpenOutputFlow[O, O])] = - appendTransform(transform ++ EmptyTransform[Out, (K, OpenOutputFlow[O, O])]()) + appendTransform(EmptyTransform[Out, (K, OpenOutputFlow[O, O])]()) + def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, OpenOutputFlow[O, O]] = + appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]()) + + // linear combinators which consume multiple flows + def flatten[O >: Out](strategy: FlattenStrategy[Out, O]): Repr[In, O] = + appendTransform(EmptyTransform[Out, O]()) // linear combinators with flows - def append[T](f: OpenFlow[Out, T]) = - appendTransform(transform ++ f.transform) - def appendClosed[T](f: OpenInputFlow[Out, T]) = - appendTransform(transform ++ f.transform).withOutput(f.output) - - // terminal combinators - def ToFuture = withOutput(FutureOut()) - def ToPublisher = withOutput(PublisherOut()) + def append[T](f: OpenFlow[Out, T]): Repr[In, T] = + appendTransform(f.transform) + def append[T](f: OpenInputFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + appendTransform(f.transform).withOutput(f.output) } -case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { - override type Repr[-I, +O] = OpenFlow[I, O] - type AfterCloseOutput[-I, +O] = OpenInputFlow[I, O] - type AfterCloseInput[-I, +O] = OpenOutputFlow[I, O] +final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] { + override type Repr[-In, +Out] = OpenFlow[In, Out] + type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] + type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] - def withOutput[O >: Out](out: Output[O]) = OpenInputFlow(out, transform) - def withInput[I <: In](in: Input[I]) = OpenOutputFlow(in, transform) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) - protected def prependTransform[T](t: Transform[T, In]) = OpenFlow(t ++ transform) - protected def appendTransform[I <: In, T](t: Transform[I, T]) = OpenFlow(t) + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) + protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = OpenFlow(transform ++ t) } -case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends HasOpenInput[In, Out] { - type Repr[-I, +O] = OpenInputFlow[I, O] - type AfterCloseInput[-I, +O] = ClosedFlow[I, O] +final case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] { + type Repr[-In, +Out] = OpenInputFlow[In, Out] + type AfterCloseInput[-In, +Out] = ClosedFlow[In, Out] - def withoutOutput = OpenFlow(transform) - def withInput[I <: In](in: Input[I]) = ClosedFlow(in, output, transform) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = ClosedFlow(in, output, transform) + def withoutOutput: OpenFlow[In, Out] = OpenFlow(transform) - protected def prependTransform[T](t: Transform[T, In]) = OpenInputFlow(output, t ++ transform) + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = + OpenInputFlow(output, t ++ transform) } -case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends HasOpenOutput[In, Out] { - override type Repr[-I, +O] = OpenOutputFlow[I, O] - type AfterCloseOutput[-I, +O] = ClosedFlow[I, O] +final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] { + override type Repr[-In, +Out] = OpenOutputFlow[In, Out] + type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] - def withOutput[O >: Out](out: Output[O]) = ClosedFlow(input, out, transform) - def withoutInput = OpenFlow(transform) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) + def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) - protected def appendTransform[I <: In, T](t: Transform[I, T]) = OpenOutputFlow(input, t) + protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) } -case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { - def withoutOutput = OpenOutputFlow(input, transform) - def withoutInput = OpenInputFlow(output, transform) +final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { + def withoutOutput: OpenOutputFlow[In, Out] = OpenOutputFlow(input, transform) + def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) def run(): Unit = () } @@ -144,4 +180,4 @@ case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transfor trait Transform[-In, +Out] { def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() } -case class EmptyTransform[-In, +Out]() extends Transform[In, Out] +final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala index 02ea979fef..6a329df5fd 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala @@ -4,18 +4,16 @@ package akka.stream.dsl * Dummy implementation needed for runtime tests. */ trait Graph { - def broadcast[T](source: HasOpenOutput[_, T], destinations: Seq[HasOpenInput[T, _]]) = () - def zip[T, U](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, U], destination: HasOpenInput[(T, U), _]) = () - /** - * Whenever a new K value is returned from `f`, a new `dest` flow will be materialized. - * - * The discriminating value `K` of the output flow can be known by applying `f` on any - * element in the output flows. - */ - def groupBy[T, K](source: HasOpenOutput[_, T], f: T ⇒ K, dest: HasOpenInput[T, _]) = () + def merge[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = () + def zip[T, U](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, U], destination: HasOpenInput[(T, U), _]) = () + def concat[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = () + def broadcast[T](source: HasOpenOutput[_, T], destinations: Seq[HasOpenInput[T, _]]) = () } object Graph { def apply(): Graph = new Graph {} } + +final case class Broadcast[T]() extends Input[T] with Output[T] +final case class Zip[T]() extends Input[T] with Output[T] diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala new file mode 100644 index 0000000000..c5d783dfe8 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala @@ -0,0 +1,86 @@ +package akka.stream.dsl + +import org.scalatest.Matchers +import org.scalatest.WordSpec +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.Future +import akka.stream.OverflowStrategy + +class CombinatorSpec extends WordSpec with Matchers { + val f = From[Int] + + "Linear simple combinators in Flow" should { + "map" in { + val t: OpenFlow[Int, Int] = f.map(_ * 2) + } + "mapFuture" in { + import scala.concurrent.ExecutionContext.Implicits.global + val t: OpenFlow[Int, Int] = f.mapFuture(Future(_)) + } + "filter" in { + val t: OpenFlow[Int, Int] = f.filter(_ != 2) + } + "collect" in { + val t: OpenFlow[Int, String] = f.collect { + case i: Int if i == 2 ⇒ "two" + } + } + "fold" in { + val t: OpenFlow[Int, String] = f.fold("elements:") { (soFar, element) ⇒ soFar + element } + } + "drop" in { + val t: OpenFlow[Int, Int] = f.drop(2) + } + "dropWithin" in { + val t: OpenFlow[Int, Int] = f.dropWithin(2.seconds) + } + "take" in { + val t: OpenFlow[Int, Int] = f.take(2) + } + "takeWithin" in { + val t: OpenFlow[Int, Int] = f.takeWithin(2.seconds) + } + "grouped" in { + val t: OpenFlow[Int, immutable.Seq[Int]] = f.grouped(2) + } + "groupedWithin" in { + val t: OpenFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds) + } + "mapConcat" in { + val t: OpenFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) } + } + "conflate" in { + val t: OpenFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i) + } + "expand" in { + val t: OpenFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_")) + } + "buffer" in { + val t: OpenFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead) + } + } + + "Linear combinators which produce multiple flows" should { + "prefixAndTail" in { + val t: OpenFlow[Int, (immutable.Seq[String], OpenOutputFlow[String, String])] = + f.map(_.toString).prefixAndTail(10) + } + "groupBy" in { + val grouped: OpenOutputFlow[Int, (Int, OpenOutputFlow[Int, Int])] = + From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o % 3) + + val closedInner: OpenOutputFlow[Int, (Int, ClosedFlow[Int, Int])] = grouped.map { + case (key, openFlow) ⇒ (key, openFlow.withOutput(FutureOut())) + } + + // both of these compile, even if `grouped` has inner flows unclosed + grouped.withOutput(FutureOut()).run + closedInner.withOutput(FutureOut()).run + } + "splitWhen" in { + val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + } + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala index a01efdb55b..e533f418c6 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala @@ -15,83 +15,73 @@ class FlowSpec extends WordSpec with Matchers { "OpenFlow" should { "go through all states" in { - val f = From[Int] + val f: OpenFlow[Int, Int] = From[Int] .withInput(intSeq) .withOutput(FutureOut()) .withoutInput .withoutOutput } "should not run" in { - val open = From[Int] + val open: OpenFlow[Int, Int] = From[Int] "open.run" shouldNot compile } "accept IterableIn" in { - val f = From[Int].withInput(intSeq) + val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intSeq) } "accept FutureIn" in { - val f = From[Int].withInput(intFut) + val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intFut) } "append OpenFlow" in { - val open1 = From[Int].map(_.toString) - val open2 = From[String].map(_.hashCode) - val open3 = open1.append(open2) + val open1: OpenFlow[Int, String] = From[Int].map(_.toString) + val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) + val open3: OpenFlow[Int, Int] = open1.append(open2) "open3.run" shouldNot compile - val closedInput = open3.withInput(intSeq) + val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq) "closedInput.run" shouldNot compile - val closedOutput = open3.ToFuture + val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(FutureOut()) "closedOutput.run" shouldNot compile - closedInput.ToFuture.run + closedInput.withOutput(FutureOut()).run closedOutput.withInput(intSeq).run } "prepend OpenFlow" in { - val open1 = From[Int].map(_.toString) - val open2 = From[String].map(_.hashCode) - val open3 = open1.prepend(open2) + val open1: OpenFlow[Int, String] = From[Int].map(_.toString) + val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) + val open3: OpenFlow[String, String] = open1.prepend(open2) "open3.run" shouldNot compile - val closedInput = open3.withInput(strSeq) + val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq) "closedInput.run" shouldNot compile - val closedOutput = open3.ToFuture + val closedOutput: OpenInputFlow[String, String] = open3.withOutput(FutureOut()) "closedOutput.run" shouldNot compile - closedInput.ToFuture.run + closedInput.withOutput(FutureOut()).run closedOutput.withInput(strSeq).run } "append OpenInputFlow" in { - val open = From[Int].map(_.toString) - val closed = From[String].map(_.hashCode).ToFuture - val appended = open.appendClosed(closed) + val open: OpenFlow[Int, String] = From[Int].map(_.toString) + val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(FutureOut()) + val appended: OpenInputFlow[Int, Int] = open.append(closedOutput) "appended.run" shouldNot compile "appended.toFuture" shouldNot compile appended.withInput(intSeq).run } "prepend OpenOutputFlow" in { - val open = From[Int].map(_.toString) - val closed = From[String].map(_.hashCode).withInput(strSeq) - val appended = open.prependClosed(closed) - "appended.run" shouldNot compile - "appended.withInput(strSeq)" shouldNot compile - appended.ToFuture.run - } - "groupBy" in { - val grouped = From(Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o % 3) - - val closedInner = grouped.map { - case (key, openFlow) ⇒ (key, openFlow.ToFuture) - } - - // both of these compile, even if `grouped` has inner flows unclosed - grouped.ToFuture.run - closedInner.ToFuture.run + val open: OpenFlow[Int, String] = From[Int].map(_.toString) + val closedInput: OpenOutputFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq) + val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput) + "prepended.run" shouldNot compile + "prepended.withInput(strSeq)" shouldNot compile + prepended.withOutput(FutureOut()).run } } "OpenInputFlow" should { - val openInput = From[Int].map(_.toString).ToFuture + val openInput: OpenInputFlow[Int, String] = + From[Int].map(_.toString).withOutput(FutureOut()) "accept Input" in { openInput.withInput(intSeq) } @@ -110,9 +100,10 @@ class FlowSpec extends WordSpec with Matchers { } "OpenOutputFlow" should { - val openOutput = From(Seq(1, 2, 3)).map(_.toString) + val openOutput: OpenOutputFlow[Int, String] = + From(Seq(1, 2, 3)).map(_.toString) "accept Output" in { - openOutput.ToFuture + openOutput.withOutput(FutureOut()) } "drop Input" in { openOutput.withoutInput @@ -129,7 +120,8 @@ class FlowSpec extends WordSpec with Matchers { } "ClosedFlow" should { - val closed = From(Seq(1, 2, 3)).map(_.toString).ToFuture + val closed: ClosedFlow[Int, String] = + From(Seq(1, 2, 3)).map(_.toString).withOutput(FutureOut()) "run" in { closed.run } diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala index 9cf287aa8b..f50f25be45 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala @@ -9,14 +9,14 @@ class GraphSpec extends WordSpec with Matchers { val intSeq = IterableIn(Seq(1, 2, 3)) "Graph" should { - "broadcast" in { - val in1 = From[Int].map(_ * 2) - val in2 = From[Int].map(_.toString) - val out1 = From[Int].map(_.toString) - val out2 = From[Int].filter(_ % 2 == 0) + "merge" in { + val in1 = From[Int] + val in2 = From[Int] + val out1 = From[Int] + val out2 = From[String] - Graph().broadcast(in1, Seq(out1, out2)) - "Graph().broadcast(in2, Seq(out1, out2))" shouldNot compile + Graph().merge(in1, in2, out1) + "Graph().merge(in1, in2, out2)" shouldNot compile } "zip" in { val in1 = From[Int] @@ -27,11 +27,23 @@ class GraphSpec extends WordSpec with Matchers { Graph().zip(in1, in2, out1) "Graph().zip(in1, in2, out2)" shouldNot compile } - "groupBy" in { - val in = From[Int] - val out = From[Int].map(_.toString) + "concat" in { + val in1 = From[Int] + val in2 = From[Int] + val out1 = From[Int] + val out2 = From[String] - Graph().groupBy(in, (i: Int) ⇒ i % 2, out) + Graph().concat(in1, in2, out1) + "Graph().concat(in1, in2, out2)" shouldNot compile + } + "broadcast" in { + val in1 = From[Int].map(_ * 2) + val in2 = From[Int].map(_.toString) + val out1 = From[Int].map(_.toString) + val out2 = From[Int].filter(_ % 2 == 0) + + Graph().broadcast(in1, Seq(out1, out2)) + "Graph().broadcast(in2, Seq(out1, out2))" shouldNot compile } } } From 59e9f7162943256822c5b751e421342ca68e76ea Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Fri, 22 Aug 2014 12:15:04 +0300 Subject: [PATCH 3/4] !str #15672 add HTTP Pipeline with the new DSL * implement graph operations as case class junctions * add materialization method to ClosedFlow --- .../server/NewDslHttpServerPipeline.scala | 114 ++++++++++++++++++ .../src/main/scala/akka/stream/dsl/Flow.scala | 96 +++++++++++++-- .../main/scala/akka/stream/dsl/Graph.scala | 32 +++-- .../akka/stream/dsl/CombinatorSpec.scala | 24 ++-- .../test/scala/akka/stream/dsl/FlowSpec.scala | 20 +-- .../scala/akka/stream/dsl/GraphSpec.scala | 60 +++++---- 6 files changed, 279 insertions(+), 67 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala diff --git a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala new file mode 100644 index 0000000000..aa5fb7f46e --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala @@ -0,0 +1,114 @@ +package akka.http.server + +import akka.event.LoggingAdapter +import akka.http.Http +import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse } +import akka.http.parsing.HttpRequestParser +import akka.http.parsing.ParserOutput._ +import akka.http.rendering.ResponseRenderingContext +import akka.stream.dsl.{ FlowMaterializer ⇒ NewFM, _ } +import akka.stream.io.StreamTcp +import akka.stream.{ FlowMaterializer, Transformer } +import akka.util.ByteString + +class NewDslHttpServerPipeline(settings: ServerSettings, + materializer: FlowMaterializer, + log: LoggingAdapter) { + + import akka.http.server.NewDslHttpServerPipeline._ + + val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)() + val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ + if (settings.parserSettings.illegalHeaderWarnings) + log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) + + val responseRendererFactory = new { + def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ??? + } + + /** + * Flow graph: + * + * tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest] + * | + * \-> applicationBypassFlow -\ + * | + * Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream + */ + def apply(tcpConn: StreamTcp.IncomingTcpConnection) = { + + val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]() + val merge = Merge[MessageStart, HttpResponse, Any]() + + val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] = + From(tcpConn.inputStream) + .transform(rootParser.copyWith(warnOnIllegalHeader)) + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) + .headAndTail() + .withOutput(broadcast.in) + + val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] = + From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] + .withInput(broadcast.out1) + .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } + .withOutput(merge.in1) + + val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] = + From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] + .withInput(broadcast.out2) + .collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + val pub = entityParts.withOutput(PublisherOut()).as(NewFM.publisher[RequestOutput]) + HttpRequest(method, effectiveUri, headers, createEntity(pub), protocol) + } + .withOutput(PublisherOut()) + + val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] = + From[HttpResponse] + .withInput(SubscriberIn()) + .withOutput(merge.in2) + + val responseFlowAfterMerge: ClosedFlow[Any, ByteString] = + From[Any] + .withInput(merge.out) + .transform(applyApplicationBypass) + .transform(responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concatOpenOutputFlow) + .transform(errorLogger(log, "Outgoing response stream error")) + .withOutput(SubscriberOut(tcpConn.outputStream)) + + val requestPublisher = requestFlowAfterBroadcast.as(NewFM.publisher[HttpRequest]) + val responseSubscriber = responseFlowBeforeMerge.as(NewFM.subscriber[HttpResponse]) + + Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) + } + + def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ??? + + private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ??? +} + +object NewDslHttpServerPipeline { + + /** + * FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow` + * or `OpenOutputFlow` to `HasOpenOutput`. + * + * Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`. + */ + implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal { + def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = { + val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] = + From[OpenOutputFlow[InnerIn, InnerOut]] + .map { f ⇒ + f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } + } + + val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = + flow.flatten(FlattenStrategy.concatOpenOutputFlow) + + underlying.append(flattened) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala index b4f1e690f6..41ed99eed3 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala @@ -1,10 +1,12 @@ package akka.stream.dsl +import akka.stream.impl.Ast +import org.reactivestreams.{ Subscriber, Publisher } + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import akka.stream.OverflowStrategy -import akka.stream.FlattenStrategy +import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy } sealed trait Flow[-In, +Out] { val transform: Transform[In, Out] @@ -28,10 +30,26 @@ object From { * Example usage: Flow(Future { 1 }) */ def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f)) + + /** + * Helper to create Flow with Input from Publisher. + */ + def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p)) } trait Input[-In] +/** + * Default input. + * Allows to materialize a Flow with this input to Subscriber. + */ +final case class SubscriberIn[-In]() extends Input[In] + +/** + * Input from Publisher. + */ +final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] + /** * Input from Iterable * @@ -48,11 +66,28 @@ final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[I */ final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] -trait Output[+Out] +trait Output[+Out] { + def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T] +} -final case class FutureOut[+Out]() extends Output[Out] +/** + * Default output. Does no reducing operations. + * Allows to materialize a Flow with this output to Publisher. + */ final case class PublisherOut[+Out]() extends Output[Out] +/** + * Output to a Subscriber. + */ +final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] + +/** + * Fold output. Reduces output stream according to the given fold function. + */ +final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { + override def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T] +} + /** * Operations with a Flow which has open (no attached) Input. * @@ -95,8 +130,6 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, Out]()) def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = appendTransform(EmptyTransform[Out, T]()) - def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) def drop(n: Int): Repr[In, Out] = appendTransform(EmptyTransform[Out, Out]()) def dropWithin(d: FiniteDuration): Repr[In, Out] = @@ -111,6 +144,8 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] = appendTransform(EmptyTransform[Out, T]()) + def transform[T](transformer: Transformer[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = appendTransform(EmptyTransform[Out, S]()) def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] = @@ -127,8 +162,8 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]()) // linear combinators which consume multiple flows - def flatten[O >: Out](strategy: FlattenStrategy[Out, O]): Repr[In, O] = - appendTransform(EmptyTransform[Out, O]()) + def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) // linear combinators with flows def append[T](f: OpenFlow[Out, T]): Repr[In, T] = @@ -142,7 +177,7 @@ final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) + def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform ++ out.transform) def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) @@ -164,7 +199,7 @@ final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transfor override type Repr[-In, +Out] = OpenOutputFlow[In, Out] type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) + def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform ++ out.transform) def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) @@ -175,9 +210,50 @@ final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], tr def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) def run(): Unit = () + + def as[O >: Out, R[_]](implicit materializer: FlowMaterializer.M[O, R[O]]): R[O] = ??? } trait Transform[-In, +Out] { def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() } final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] + +object FlattenStrategy { + def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]() + def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]() + + final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out] + final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out] +} + +/** + * At the end we need to materialize the stream. It could be done by setting an output + * with a transformation (e.g Out -> Publisher[Out]) which would do the materialization. + * + * Or we could have an additional step on ClosedFlow like the one Viktor suggested. + * https://github.com/akka/akka/issues/15633#issuecomment-52307292 + */ +object FlowMaterializer { + trait M[T, R] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): R + } + + def publisher[T]: FlowMaterializer.M[T, Publisher[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Publisher[T]]] + private[this] final val publisizer = mkPublisher[Any] + private[this] def mkPublisher[T] = new FlowMaterializer.M[T, Publisher[T]] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[T] = ??? + } + + def subscriber[T]: FlowMaterializer.M[T, Subscriber[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Subscriber[T]]] + private[this] final val sbscrisizer = mkSubscriber[Any] + private[this] def mkSubscriber[T] = new FlowMaterializer.M[T, Subscriber[T]] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Subscriber[T] = ??? + } + + def future[T]: FlowMaterializer.M[T, Future[T]] = futurizer.asInstanceOf[M[T, Future[T]]] + private[this] final val futurizer = mkFuture[Any] + private[this] def mkFuture[T] = new M[T, Future[T]] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Future[T] = ??? + } +} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala index 6a329df5fd..3030f6cff9 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala @@ -1,19 +1,25 @@ package akka.stream.dsl -/** - * Dummy implementation needed for runtime tests. - */ -trait Graph { - - def merge[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = () - def zip[T, U](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, U], destination: HasOpenInput[(T, U), _]) = () - def concat[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = () - def broadcast[T](source: HasOpenOutput[_, T], destinations: Seq[HasOpenInput[T, _]]) = () +final case class Merge[T, U, V >: T with U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[V] {} } -object Graph { - def apply(): Graph = new Graph {} +final case class Zip[T, U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[(T, U)] {} } -final case class Broadcast[T]() extends Input[T] with Output[T] -final case class Zip[T]() extends Input[T] with Output[T] +final case class Concat[T, U, V >: T with U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[V] {} +} + +final case class Broadcast[T]() { + val in = new Output[T] {} + val out1 = new Input[T] {} + val out2 = new Input[T] {} +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala index c5d783dfe8..819aa5552c 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala @@ -27,7 +27,8 @@ class CombinatorSpec extends WordSpec with Matchers { } } "fold" in { - val t: OpenFlow[Int, String] = f.fold("elements:") { (soFar, element) ⇒ soFar + element } + val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } + val t: OpenInputFlow[Int, String] = f.withOutput(fo) } "drop" in { val t: OpenFlow[Int, Int] = f.drop(2) @@ -67,20 +68,27 @@ class CombinatorSpec extends WordSpec with Matchers { f.map(_.toString).prefixAndTail(10) } "groupBy" in { - val grouped: OpenOutputFlow[Int, (Int, OpenOutputFlow[Int, Int])] = - From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o % 3) + val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] = + From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString) - val closedInner: OpenOutputFlow[Int, (Int, ClosedFlow[Int, Int])] = grouped.map { - case (key, openFlow) ⇒ (key, openFlow.withOutput(FutureOut())) + val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map { + case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut())) } // both of these compile, even if `grouped` has inner flows unclosed - grouped.withOutput(FutureOut()).run - closedInner.withOutput(FutureOut()).run + grouped.withOutput(PublisherOut()).run + closedInner.withOutput(PublisherOut()).run } "splitWhen" in { val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) } } -} \ No newline at end of file + "Linear combinators which consume multiple flows" should { + "flatten" in { + val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow) + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala index e533f418c6..fe6cee34dd 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala @@ -17,7 +17,7 @@ class FlowSpec extends WordSpec with Matchers { "go through all states" in { val f: OpenFlow[Int, Int] = From[Int] .withInput(intSeq) - .withOutput(FutureOut()) + .withOutput(PublisherOut()) .withoutInput .withoutOutput } @@ -40,10 +40,10 @@ class FlowSpec extends WordSpec with Matchers { val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq) "closedInput.run" shouldNot compile - val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(FutureOut()) + val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut()) "closedOutput.run" shouldNot compile - closedInput.withOutput(FutureOut()).run + closedInput.withOutput(PublisherOut()).run closedOutput.withInput(intSeq).run } "prepend OpenFlow" in { @@ -55,15 +55,15 @@ class FlowSpec extends WordSpec with Matchers { val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq) "closedInput.run" shouldNot compile - val closedOutput: OpenInputFlow[String, String] = open3.withOutput(FutureOut()) + val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut()) "closedOutput.run" shouldNot compile - closedInput.withOutput(FutureOut()).run + closedInput.withOutput(PublisherOut()).run closedOutput.withInput(strSeq).run } "append OpenInputFlow" in { val open: OpenFlow[Int, String] = From[Int].map(_.toString) - val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(FutureOut()) + val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) val appended: OpenInputFlow[Int, Int] = open.append(closedOutput) "appended.run" shouldNot compile "appended.toFuture" shouldNot compile @@ -75,13 +75,13 @@ class FlowSpec extends WordSpec with Matchers { val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput) "prepended.run" shouldNot compile "prepended.withInput(strSeq)" shouldNot compile - prepended.withOutput(FutureOut()).run + prepended.withOutput(PublisherOut()).run } } "OpenInputFlow" should { val openInput: OpenInputFlow[Int, String] = - From[Int].map(_.toString).withOutput(FutureOut()) + From[Int].map(_.toString).withOutput(PublisherOut()) "accept Input" in { openInput.withInput(intSeq) } @@ -103,7 +103,7 @@ class FlowSpec extends WordSpec with Matchers { val openOutput: OpenOutputFlow[Int, String] = From(Seq(1, 2, 3)).map(_.toString) "accept Output" in { - openOutput.withOutput(FutureOut()) + openOutput.withOutput(PublisherOut()) } "drop Input" in { openOutput.withoutInput @@ -121,7 +121,7 @@ class FlowSpec extends WordSpec with Matchers { "ClosedFlow" should { val closed: ClosedFlow[Int, String] = - From(Seq(1, 2, 3)).map(_.toString).withOutput(FutureOut()) + From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) "run" in { closed.run } diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala index f50f25be45..46647af26f 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala @@ -6,44 +6,52 @@ import scala.collection.immutable.Seq class GraphSpec extends WordSpec with Matchers { - val intSeq = IterableIn(Seq(1, 2, 3)) - "Graph" should { "merge" in { - val in1 = From[Int] - val in2 = From[Int] - val out1 = From[Int] - val out2 = From[String] + val merge = Merge[Int, Int, Int]() - Graph().merge(in1, in2, out1) - "Graph().merge(in1, in2, out2)" shouldNot compile + val in1 = From[Int].withOutput(merge.in1) + val in2 = From[Int].withOutput(merge.in2) + val out1 = From[Int].withInput(merge.out) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(merge.out)" shouldNot compile } "zip" in { - val in1 = From[Int] - val in2 = From[String] - val out1 = From[(Int, String)] - val out2 = From[(String, Int)] + val zip = Zip[Int, String]() - Graph().zip(in1, in2, out1) - "Graph().zip(in1, in2, out2)" shouldNot compile + val in1 = From[Int].withOutput(zip.in1) + val in2 = From[String].withOutput(zip.in2) + val out1 = From[(Int, String)].withInput(zip.out) + + val out2 = From[(String, Int)] + // FIXME: make me not compile + //"out2.withInput(zip.out)" shouldNot compile } "concat" in { - val in1 = From[Int] - val in2 = From[Int] - val out1 = From[Int] - val out2 = From[String] + trait A + trait B extends A - Graph().concat(in1, in2, out1) - "Graph().concat(in1, in2, out2)" shouldNot compile + val concat = Concat[A, B, A]() + val in1 = From[A].withOutput(concat.in1) + val in2 = From[B].withOutput(concat.in2) + val out1 = From[A].withInput(concat.out) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(concat.out)" shouldNot compile } "broadcast" in { - val in1 = From[Int].map(_ * 2) - val in2 = From[Int].map(_.toString) - val out1 = From[Int].map(_.toString) - val out2 = From[Int].filter(_ % 2 == 0) + val broadcast = Broadcast[Int]() - Graph().broadcast(in1, Seq(out1, out2)) - "Graph().broadcast(in2, Seq(out1, out2))" shouldNot compile + val in1 = From[Int].withOutput(broadcast.in) + val in2 = From[Int].withInput(broadcast.out1) + val out1 = From[Int].withInput(broadcast.out2) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(broadcast.out2)" shouldNot compile } } } From 3c38706bb791aecf51613707b9ac1b015984171a Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Fri, 22 Aug 2014 14:19:41 +0300 Subject: [PATCH 4/4] !str #15672 add a way to retrieve Publsiher or Subscriber from Output/Input * also remove ClosedFlow.as(...) --- .../server/NewDslHttpServerPipeline.scala | 18 +++---- .../src/main/scala/akka/stream/dsl/Flow.scala | 53 ++++--------------- .../akka/stream/dsl/CombinatorSpec.scala | 2 +- 3 files changed, 21 insertions(+), 52 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala index aa5fb7f46e..4a9defde83 100644 --- a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala @@ -6,7 +6,7 @@ import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse } import akka.http.parsing.HttpRequestParser import akka.http.parsing.ParserOutput._ import akka.http.rendering.ResponseRenderingContext -import akka.stream.dsl.{ FlowMaterializer ⇒ NewFM, _ } +import akka.stream.dsl._ import akka.stream.io.StreamTcp import akka.stream.{ FlowMaterializer, Transformer } import akka.util.ByteString @@ -53,20 +53,23 @@ class NewDslHttpServerPipeline(settings: ServerSettings, .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } .withOutput(merge.in1) + val requestPublisher = PublisherOut[HttpRequest]() val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] = From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] .withInput(broadcast.out2) .collect { case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) - val pub = entityParts.withOutput(PublisherOut()).as(NewFM.publisher[RequestOutput]) - HttpRequest(method, effectiveUri, headers, createEntity(pub), protocol) + val publisher = PublisherOut[RequestOutput]() + val flow = entityParts.withOutput(publisher) + HttpRequest(method, effectiveUri, headers, createEntity(publisher.publisher), protocol) } - .withOutput(PublisherOut()) + .withOutput(requestPublisher) + val responseSubscriber = SubscriberIn[HttpResponse]() val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] = From[HttpResponse] - .withInput(SubscriberIn()) + .withInput(responseSubscriber) .withOutput(merge.in2) val responseFlowAfterMerge: ClosedFlow[Any, ByteString] = @@ -78,10 +81,7 @@ class NewDslHttpServerPipeline(settings: ServerSettings, .transform(errorLogger(log, "Outgoing response stream error")) .withOutput(SubscriberOut(tcpConn.outputStream)) - val requestPublisher = requestFlowAfterBroadcast.as(NewFM.publisher[HttpRequest]) - val responseSubscriber = responseFlowBeforeMerge.as(NewFM.subscriber[HttpResponse]) - - Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) + Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher.publisher, responseSubscriber.subscriber) } def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ??? diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala index 41ed99eed3..aa2edb4dd8 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala @@ -43,7 +43,9 @@ trait Input[-In] * Default input. * Allows to materialize a Flow with this input to Subscriber. */ -final case class SubscriberIn[-In]() extends Input[In] +final case class SubscriberIn[-In]() extends Input[In] { + def subscriber[I <: In]: Subscriber[I] = ??? +} /** * Input from Publisher. @@ -66,15 +68,15 @@ final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[I */ final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] -trait Output[+Out] { - def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T] -} +trait Output[+Out] /** - * Default output. Does no reducing operations. + * Default output. * Allows to materialize a Flow with this output to Publisher. */ -final case class PublisherOut[+Out]() extends Output[Out] +final case class PublisherOut[+Out]() extends Output[Out] { + def publisher[O >: Out]: Publisher[O] = ??? +} /** * Output to a Subscriber. @@ -85,7 +87,7 @@ final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out * Fold output. Reduces output stream according to the given fold function. */ final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { - override def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T] + def future: Future[T] = ??? } /** @@ -177,7 +179,7 @@ final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] - def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform ++ out.transform) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) @@ -199,7 +201,7 @@ final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transfor override type Repr[-In, +Out] = OpenOutputFlow[In, Out] type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] - def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform ++ out.transform) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) @@ -210,8 +212,6 @@ final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], tr def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) def run(): Unit = () - - def as[O >: Out, R[_]](implicit materializer: FlowMaterializer.M[O, R[O]]): R[O] = ??? } trait Transform[-In, +Out] { @@ -226,34 +226,3 @@ object FlattenStrategy { final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out] final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out] } - -/** - * At the end we need to materialize the stream. It could be done by setting an output - * with a transformation (e.g Out -> Publisher[Out]) which would do the materialization. - * - * Or we could have an additional step on ClosedFlow like the one Viktor suggested. - * https://github.com/akka/akka/issues/15633#issuecomment-52307292 - */ -object FlowMaterializer { - trait M[T, R] { - def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): R - } - - def publisher[T]: FlowMaterializer.M[T, Publisher[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Publisher[T]]] - private[this] final val publisizer = mkPublisher[Any] - private[this] def mkPublisher[T] = new FlowMaterializer.M[T, Publisher[T]] { - def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[T] = ??? - } - - def subscriber[T]: FlowMaterializer.M[T, Subscriber[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Subscriber[T]]] - private[this] final val sbscrisizer = mkSubscriber[Any] - private[this] def mkSubscriber[T] = new FlowMaterializer.M[T, Subscriber[T]] { - def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Subscriber[T] = ??? - } - - def future[T]: FlowMaterializer.M[T, Future[T]] = futurizer.asInstanceOf[M[T, Future[T]]] - private[this] final val futurizer = mkFuture[Any] - private[this] def mkFuture[T] = new M[T, Future[T]] { - def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Future[T] = ??? - } -} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala index 819aa5552c..4e3f60bae8 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala @@ -28,7 +28,7 @@ class CombinatorSpec extends WordSpec with Matchers { } "fold" in { val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } - val t: OpenInputFlow[Int, String] = f.withOutput(fo) + val t: OpenInputFlow[Int, Int] = f.withOutput(fo) } "drop" in { val t: OpenFlow[Int, Int] = f.drop(2)