From 5384c4f098ff54a242e96544750479c69f9295b9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Aug 2014 17:59:12 +0200 Subject: [PATCH 1/3] =str Move new dsl to scaladsl2 package --- .../src/main/scala/akka/stream/{dsl => scaladsl2}/Flow.scala | 5 ++++- .../main/scala/akka/stream/{dsl => scaladsl2}/Graph.scala | 5 ++++- .../akka/stream/{dsl => scaladsl2}/CombinatorSpec.scala | 5 ++++- .../test/scala/akka/stream/{dsl => scaladsl2}/FlowSpec.scala | 5 ++++- .../scala/akka/stream/{dsl => scaladsl2}/GraphSpec.scala | 5 ++++- 5 files changed, 20 insertions(+), 5 deletions(-) rename akka-stream/src/main/scala/akka/stream/{dsl => scaladsl2}/Flow.scala (98%) rename akka-stream/src/main/scala/akka/stream/{dsl => scaladsl2}/Graph.scala (83%) rename akka-stream/src/test/scala/akka/stream/{dsl => scaladsl2}/CombinatorSpec.scala (96%) rename akka-stream/src/test/scala/akka/stream/{dsl => scaladsl2}/FlowSpec.scala (97%) rename akka-stream/src/test/scala/akka/stream/{dsl => scaladsl2}/GraphSpec.scala (93%) diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala similarity index 98% rename from akka-stream/src/main/scala/akka/stream/dsl/Flow.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index aa2edb4dd8..bf3ba50e49 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -1,4 +1,7 @@ -package akka.stream.dsl +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 import akka.stream.impl.Ast import org.reactivestreams.{ Subscriber, Publisher } diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Graph.scala similarity index 83% rename from akka-stream/src/main/scala/akka/stream/dsl/Graph.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl2/Graph.scala index 3030f6cff9..b119c3e0af 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Graph.scala @@ -1,4 +1,7 @@ -package akka.stream.dsl +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 final case class Merge[T, U, V >: T with U]() { val in1 = new Output[T] {} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala similarity index 96% rename from akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala rename to akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala index 4e3f60bae8..2f6109ce8d 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala @@ -1,4 +1,7 @@ -package akka.stream.dsl +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 import org.scalatest.Matchers import org.scalatest.WordSpec diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala similarity index 97% rename from akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala rename to akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index fe6cee34dd..b3244648b9 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -1,4 +1,7 @@ -package akka.stream.dsl +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 import org.scalatest.{ Matchers, WordSpec } diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/GraphSpec.scala similarity index 93% rename from akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala rename to akka-stream/src/test/scala/akka/stream/scaladsl2/GraphSpec.scala index 46647af26f..d1c30bff46 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/GraphSpec.scala @@ -1,4 +1,7 @@ -package akka.stream.dsl +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 import org.scalatest.{ WordSpec, Matchers } From 37f94c9d8d2ca872de39837adcf69b1e30dcf945 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Aug 2014 18:11:11 +0200 Subject: [PATCH 2/3] =str #15746 Rename flow classes --- .../scala/akka/stream/scaladsl2/Flow.scala | 94 +++++++++++-------- .../{Graph.scala => FlowGraph.scala} | 0 .../stream/scaladsl2/CombinatorSpec.scala | 42 ++++----- .../{GraphSpec.scala => FlowGraphSpec.scala} | 2 +- .../akka/stream/scaladsl2/FlowSpec.scala | 62 ++++++------ 5 files changed, 106 insertions(+), 94 deletions(-) rename akka-stream/src/main/scala/akka/stream/scaladsl2/{Graph.scala => FlowGraph.scala} (100%) rename akka-stream/src/test/scala/akka/stream/scaladsl2/{GraphSpec.scala => FlowGraphSpec.scala} (98%) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index bf3ba50e49..fc240e390e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -20,24 +20,24 @@ object From { * Helper to create Flow without Input. * Example usage: From[Int] */ - def apply[T]: OpenFlow[T, T] = OpenFlow[T, T](EmptyTransform[T, T]()) + def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](EmptyTransform[T, T]()) /** * Helper to create Flow with Input from Iterable. * Example usage: Flow(Seq(1,2,3)) */ - def apply[T](i: immutable.Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i)) + def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = From[T].withInput(IterableIn(i)) /** * Helper to create Flow with Input from Future. * Example usage: Flow(Future { 1 }) */ - def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f)) + def apply[T](f: Future[T]): PublisherFlow[T, T] = From[T].withInput(FutureIn(f)) /** * Helper to create Flow with Input from Publisher. */ - def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p)) + def apply[T](p: Publisher[T]): PublisherFlow[T, T] = From[T].withInput(PublisherIn(p)) } trait Input[-In] @@ -107,9 +107,9 @@ sealed trait HasOpenInput[-In, +Out] { protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] // linear combinators with flows - def prepend[T](f: OpenFlow[T, In]): Repr[T, Out] = + def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = prependTransform(f.transform) - def prepend[T](f: OpenOutputFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = prependTransform(f.transform).withInput(f.input) } @@ -159,60 +159,72 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, Out]()) // linear combinators which produce multiple flows - def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], OpenOutputFlow[O, O])] = - appendTransform(EmptyTransform[Out, (immutable.Seq[O], OpenOutputFlow[O, O])]()) - def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, OpenOutputFlow[O, O])] = - appendTransform(EmptyTransform[Out, (K, OpenOutputFlow[O, O])]()) - def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, OpenOutputFlow[O, O]] = - appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]()) + def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], PublisherFlow[O, O])] = + appendTransform(EmptyTransform[Out, (immutable.Seq[O], PublisherFlow[O, O])]()) + def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, PublisherFlow[O, O])] = + appendTransform(EmptyTransform[Out, (K, PublisherFlow[O, O])]()) + def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, PublisherFlow[O, O]] = + appendTransform(EmptyTransform[Out, PublisherFlow[O, O]]()) // linear combinators which consume multiple flows def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] = appendTransform(EmptyTransform[Out, T]()) // linear combinators with flows - def append[T](f: OpenFlow[Out, T]): Repr[In, T] = + def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = appendTransform(f.transform) - def append[T](f: OpenInputFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = appendTransform(f.transform).withOutput(f.output) } -final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] { - override type Repr[-In, +Out] = OpenFlow[In, Out] - type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] - type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] +/** + * Flow without attached input and without attached output, can be used as a `Processor`. + */ +final case class ProcessorFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] { + override type Repr[-In, +Out] = ProcessorFlow[In, Out] + type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out] + type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, transform) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, transform) - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) - protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = OpenFlow(transform ++ t) + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = ProcessorFlow(t ++ transform) + protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = ProcessorFlow(transform ++ t) } -final case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] { - type Repr[-In, +Out] = OpenInputFlow[In, Out] - type AfterCloseInput[-In, +Out] = ClosedFlow[In, Out] +/** + * Flow with attached output, can be used as a `Subscriber`. + */ +final case class SubscriberFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] { + type Repr[-In, +Out] = SubscriberFlow[In, Out] + type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out] - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = ClosedFlow(in, output, transform) - def withoutOutput: OpenFlow[In, Out] = OpenFlow(transform) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, transform) + def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(transform) protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = - OpenInputFlow(output, t ++ transform) + SubscriberFlow(output, t ++ transform) } -final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] { - override type Repr[-In, +Out] = OpenOutputFlow[In, Out] - type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] +/** + * Flow with attached input, can be used as a `Publisher`. + */ +final case class PublisherFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] { + override type Repr[-In, +Out] = PublisherFlow[In, Out] + type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) - def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, transform) + def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(transform) - protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) + protected def appendTransform[T](t: Transform[Out, T]) = PublisherFlow(input, transform ++ t) } -final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { - def withoutOutput: OpenOutputFlow[In, Out] = OpenOutputFlow(input, transform) - def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) +/** + * Flow with attached input and output, can be executed. + */ +final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { + def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, transform) + def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, transform) def run(): Unit = () } @@ -223,9 +235,9 @@ trait Transform[-In, +Out] { final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] object FlattenStrategy { - def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]() - def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]() + def concatPublisherFlow[In, Out]: FlattenStrategy[PublisherFlow[In, Out], Out] = ConcatPublisherFlow[In, Out]() + def concatProcessorFlow[In, Out]: FlattenStrategy[ProcessorFlow[In, Out], Out] = ConcatProcessorFlow[In, Out]() - final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out] - final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out] + final case class ConcatPublisherFlow[In, Out]() extends FlattenStrategy[PublisherFlow[In, Out], Out] + final case class ConcatProcessorFlow[In, Out]() extends FlattenStrategy[ProcessorFlow[In, Out], Out] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala similarity index 100% rename from akka-stream/src/main/scala/akka/stream/scaladsl2/Graph.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala index 2f6109ce8d..1dc9a7d504 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala @@ -15,66 +15,66 @@ class CombinatorSpec extends WordSpec with Matchers { "Linear simple combinators in Flow" should { "map" in { - val t: OpenFlow[Int, Int] = f.map(_ * 2) + val t: ProcessorFlow[Int, Int] = f.map(_ * 2) } "mapFuture" in { import scala.concurrent.ExecutionContext.Implicits.global - val t: OpenFlow[Int, Int] = f.mapFuture(Future(_)) + val t: ProcessorFlow[Int, Int] = f.mapFuture(Future(_)) } "filter" in { - val t: OpenFlow[Int, Int] = f.filter(_ != 2) + val t: ProcessorFlow[Int, Int] = f.filter(_ != 2) } "collect" in { - val t: OpenFlow[Int, String] = f.collect { + val t: ProcessorFlow[Int, String] = f.collect { case i: Int if i == 2 ⇒ "two" } } "fold" in { val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } - val t: OpenInputFlow[Int, Int] = f.withOutput(fo) + val t: SubscriberFlow[Int, Int] = f.withOutput(fo) } "drop" in { - val t: OpenFlow[Int, Int] = f.drop(2) + val t: ProcessorFlow[Int, Int] = f.drop(2) } "dropWithin" in { - val t: OpenFlow[Int, Int] = f.dropWithin(2.seconds) + val t: ProcessorFlow[Int, Int] = f.dropWithin(2.seconds) } "take" in { - val t: OpenFlow[Int, Int] = f.take(2) + val t: ProcessorFlow[Int, Int] = f.take(2) } "takeWithin" in { - val t: OpenFlow[Int, Int] = f.takeWithin(2.seconds) + val t: ProcessorFlow[Int, Int] = f.takeWithin(2.seconds) } "grouped" in { - val t: OpenFlow[Int, immutable.Seq[Int]] = f.grouped(2) + val t: ProcessorFlow[Int, immutable.Seq[Int]] = f.grouped(2) } "groupedWithin" in { - val t: OpenFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds) + val t: ProcessorFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds) } "mapConcat" in { - val t: OpenFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) } + val t: ProcessorFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) } } "conflate" in { - val t: OpenFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i) + val t: ProcessorFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i) } "expand" in { - val t: OpenFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_")) + val t: ProcessorFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_")) } "buffer" in { - val t: OpenFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead) + val t: ProcessorFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead) } } "Linear combinators which produce multiple flows" should { "prefixAndTail" in { - val t: OpenFlow[Int, (immutable.Seq[String], OpenOutputFlow[String, String])] = + val t: ProcessorFlow[Int, (immutable.Seq[String], PublisherFlow[String, String])] = f.map(_.toString).prefixAndTail(10) } "groupBy" in { - val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] = + val grouped: PublisherFlow[Int, (String, PublisherFlow[Int, Int])] = From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString) - val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map { + val closedInner: PublisherFlow[Int, (String, RunnableFlow[Int, Int])] = grouped.map { case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut())) } @@ -83,14 +83,14 @@ class CombinatorSpec extends WordSpec with Matchers { closedInner.withOutput(PublisherOut()).run } "splitWhen" in { - val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + val t: ProcessorFlow[Int, PublisherFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) } } "Linear combinators which consume multiple flows" should { "flatten" in { - val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) - val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow) + val split: ProcessorFlow[Int, PublisherFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + val flattened: ProcessorFlow[Int, String] = split.flatten(FlattenStrategy.concatPublisherFlow) } } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala similarity index 98% rename from akka-stream/src/test/scala/akka/stream/scaladsl2/GraphSpec.scala rename to akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala index d1c30bff46..2c966f061e 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/GraphSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala @@ -9,7 +9,7 @@ import scala.collection.immutable.Seq class GraphSpec extends WordSpec with Matchers { - "Graph" should { + "A Flow Graph" should { "merge" in { val merge = Merge[Int, Int, Int]() diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index b3244648b9..ec69b5d2c5 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -16,74 +16,74 @@ class FlowSpec extends WordSpec with Matchers { import scala.concurrent.ExecutionContext.Implicits.global val intFut = FutureIn(Future { 3 }) - "OpenFlow" should { + "ProcessorFlow" should { "go through all states" in { - val f: OpenFlow[Int, Int] = From[Int] + val f: ProcessorFlow[Int, Int] = From[Int] .withInput(intSeq) .withOutput(PublisherOut()) .withoutInput .withoutOutput } "should not run" in { - val open: OpenFlow[Int, Int] = From[Int] + val open: ProcessorFlow[Int, Int] = From[Int] "open.run" shouldNot compile } "accept IterableIn" in { - val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intSeq) + val f: PublisherFlow[Int, Int] = From[Int].withInput(intSeq) } "accept FutureIn" in { - val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intFut) + val f: PublisherFlow[Int, Int] = From[Int].withInput(intFut) } - "append OpenFlow" in { - val open1: OpenFlow[Int, String] = From[Int].map(_.toString) - val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) - val open3: OpenFlow[Int, Int] = open1.append(open2) + "append ProcessorFlow" in { + val open1: ProcessorFlow[Int, String] = From[Int].map(_.toString) + val open2: ProcessorFlow[String, Int] = From[String].map(_.hashCode) + val open3: ProcessorFlow[Int, Int] = open1.append(open2) "open3.run" shouldNot compile - val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq) + val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq) "closedInput.run" shouldNot compile - val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut()) + val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut()) "closedOutput.run" shouldNot compile closedInput.withOutput(PublisherOut()).run closedOutput.withInput(intSeq).run } - "prepend OpenFlow" in { - val open1: OpenFlow[Int, String] = From[Int].map(_.toString) - val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) - val open3: OpenFlow[String, String] = open1.prepend(open2) + "prepend ProcessorFlow" in { + val open1: ProcessorFlow[Int, String] = From[Int].map(_.toString) + val open2: ProcessorFlow[String, Int] = From[String].map(_.hashCode) + val open3: ProcessorFlow[String, String] = open1.prepend(open2) "open3.run" shouldNot compile - val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq) + val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq) "closedInput.run" shouldNot compile - val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut()) + val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut()) "closedOutput.run" shouldNot compile closedInput.withOutput(PublisherOut()).run closedOutput.withInput(strSeq).run } - "append OpenInputFlow" in { - val open: OpenFlow[Int, String] = From[Int].map(_.toString) - val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) - val appended: OpenInputFlow[Int, Int] = open.append(closedOutput) + "append SubscriberFlow" in { + val open: ProcessorFlow[Int, String] = From[Int].map(_.toString) + val closedOutput: SubscriberFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) + val appended: SubscriberFlow[Int, Int] = open.append(closedOutput) "appended.run" shouldNot compile "appended.toFuture" shouldNot compile appended.withInput(intSeq).run } - "prepend OpenOutputFlow" in { - val open: OpenFlow[Int, String] = From[Int].map(_.toString) - val closedInput: OpenOutputFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq) - val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput) + "prepend PublisherFlow" in { + val open: ProcessorFlow[Int, String] = From[Int].map(_.toString) + val closedInput: PublisherFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq) + val prepended: PublisherFlow[String, String] = open.prepend(closedInput) "prepended.run" shouldNot compile "prepended.withInput(strSeq)" shouldNot compile prepended.withOutput(PublisherOut()).run } } - "OpenInputFlow" should { - val openInput: OpenInputFlow[Int, String] = + "SubscriberFlow" should { + val openInput: SubscriberFlow[Int, String] = From[Int].map(_.toString).withOutput(PublisherOut()) "accept Input" in { openInput.withInput(intSeq) @@ -102,8 +102,8 @@ class FlowSpec extends WordSpec with Matchers { } } - "OpenOutputFlow" should { - val openOutput: OpenOutputFlow[Int, String] = + "PublisherFlow" should { + val openOutput: PublisherFlow[Int, String] = From(Seq(1, 2, 3)).map(_.toString) "accept Output" in { openOutput.withOutput(PublisherOut()) @@ -122,8 +122,8 @@ class FlowSpec extends WordSpec with Matchers { } } - "ClosedFlow" should { - val closed: ClosedFlow[Int, String] = + "RunnableFlow" should { + val closed: RunnableFlow[Int, String] = From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) "run" in { closed.run From b0051c492e7c3304ced98948a02b457a2f7729c2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Aug 2014 18:21:44 +0200 Subject: [PATCH 3/3] =str #15744 Hook-up transform to the AST nodes * Removed unused parts of the new dsl, such as FlowGraph * Materialization of transform, still with old way of terminal operators, input/output factories will come next * New ActorBasedFlowMaterializer in impl2 --- .../server/NewDslHttpServerPipeline.scala | 114 ----- .../akka/stream/impl/ActorProcessor.scala | 2 +- .../impl2/ActorBasedFlowMaterializer.scala | 171 ++++++++ .../scala/akka/stream/scaladsl2/Flow.scala | 197 ++++----- .../akka/stream/scaladsl2/FlowGraph.scala | 28 -- .../stream/scaladsl2/FlowMaterializer.scala | 81 ++++ .../stream/scaladsl2/CombinatorSpec.scala | 77 +--- .../akka/stream/scaladsl2/FlowGraphSpec.scala | 60 --- .../akka/stream/scaladsl2/FlowSpec.scala | 68 +-- .../stream/scaladsl2/FlowTransformSpec.scala | 403 ++++++++++++++++++ 10 files changed, 778 insertions(+), 423 deletions(-) delete mode 100644 akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala delete mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala diff --git a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala deleted file mode 100644 index 4a9defde83..0000000000 --- a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala +++ /dev/null @@ -1,114 +0,0 @@ -package akka.http.server - -import akka.event.LoggingAdapter -import akka.http.Http -import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse } -import akka.http.parsing.HttpRequestParser -import akka.http.parsing.ParserOutput._ -import akka.http.rendering.ResponseRenderingContext -import akka.stream.dsl._ -import akka.stream.io.StreamTcp -import akka.stream.{ FlowMaterializer, Transformer } -import akka.util.ByteString - -class NewDslHttpServerPipeline(settings: ServerSettings, - materializer: FlowMaterializer, - log: LoggingAdapter) { - - import akka.http.server.NewDslHttpServerPipeline._ - - val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)() - val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ - if (settings.parserSettings.illegalHeaderWarnings) - log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) - - val responseRendererFactory = new { - def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ??? - } - - /** - * Flow graph: - * - * tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest] - * | - * \-> applicationBypassFlow -\ - * | - * Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream - */ - def apply(tcpConn: StreamTcp.IncomingTcpConnection) = { - - val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]() - val merge = Merge[MessageStart, HttpResponse, Any]() - - val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] = - From(tcpConn.inputStream) - .transform(rootParser.copyWith(warnOnIllegalHeader)) - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail() - .withOutput(broadcast.in) - - val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] = - From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] - .withInput(broadcast.out1) - .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } - .withOutput(merge.in1) - - val requestPublisher = PublisherOut[HttpRequest]() - val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] = - From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] - .withInput(broadcast.out2) - .collect { - case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ - val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) - val publisher = PublisherOut[RequestOutput]() - val flow = entityParts.withOutput(publisher) - HttpRequest(method, effectiveUri, headers, createEntity(publisher.publisher), protocol) - } - .withOutput(requestPublisher) - - val responseSubscriber = SubscriberIn[HttpResponse]() - val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] = - From[HttpResponse] - .withInput(responseSubscriber) - .withOutput(merge.in2) - - val responseFlowAfterMerge: ClosedFlow[Any, ByteString] = - From[Any] - .withInput(merge.out) - .transform(applyApplicationBypass) - .transform(responseRendererFactory.newRenderer) - .flatten(FlattenStrategy.concatOpenOutputFlow) - .transform(errorLogger(log, "Outgoing response stream error")) - .withOutput(SubscriberOut(tcpConn.outputStream)) - - Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher.publisher, responseSubscriber.subscriber) - } - - def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ??? - - private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ??? -} - -object NewDslHttpServerPipeline { - - /** - * FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow` - * or `OpenOutputFlow` to `HasOpenOutput`. - * - * Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`. - */ - implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal { - def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = { - val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] = - From[OpenOutputFlow[InnerIn, InnerOut]] - .map { f ⇒ - f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } - } - - val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = - flow.flatten(FlattenStrategy.concatOpenOutputFlow) - - underlying.append(flattened) - } - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 6a890ed28e..b084e06348 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -45,7 +45,7 @@ private[akka] object ActorProcessor { /** * INTERNAL API */ -private[akka] class ActorProcessor[I, O] private (impl: ActorRef) extends ActorPublisher[O](impl, None) +private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl, None) with Processor[I, O] { override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s) override def onError(t: Throwable): Unit = impl ! OnError(t) diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala new file mode 100644 index 0000000000..a6024b9f51 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl2 + +import java.util.concurrent.atomic.AtomicLong +import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef } +import akka.pattern.ask +import org.reactivestreams.{ Processor, Publisher, Subscriber } +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } +import akka.stream.Transformer +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.MaterializerSettings +import akka.stream.impl.EmptyPublisher +import akka.stream.impl.ActorPublisher +import akka.stream.impl.IterablePublisher +import akka.stream.impl.TransformProcessorImpl +import akka.stream.impl.ActorProcessor +import akka.stream.impl.ExposedPublisher + +/** + * INTERNAL API + */ +private[akka] object Ast { + sealed trait AstNode { + def name: String + } + + case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode + + trait PublisherNode[I] { + private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] + } + + final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] { + def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher + } + + final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { + def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = + if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] + else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), + name = s"$flowName-0-iterable"), Some(iterable)) + } + +} + +/** + * INTERNAL API + */ +private[akka] case class ActorBasedFlowMaterializer( + override val settings: MaterializerSettings, + supervisor: ActorRef, + flowNameCounter: AtomicLong, + namePrefix: String) + extends FlowMaterializer(settings) { + import akka.stream.impl2.Ast._ + + def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) + + private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() + + private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" + + @tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode], + flowName: String, n: Int): Subscriber[_] = { + ops match { + case op :: tail ⇒ + val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n) + opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]]) + processorChain(opProcessor, tail, flowName, n - 1) + case _ ⇒ topSubscriber + } + } + + // Ops come in reverse order + override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = { + val flowName = createFlowName() + if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]] + else { + val opsSize = ops.size + val opProcessor = processorForNode(ops.head, flowName, opsSize) + val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) + publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]]) + opProcessor.asInstanceOf[Publisher[O]] + } + } + + private val identityTransform = Transform("identity", () ⇒ + new Transformer[Any, Any] { + override def onNext(element: Any) = List(element) + }) + + def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") + ActorProcessorFactory(impl) + } + + def actorOf(props: Props, name: String): ActorRef = supervisor match { + case ref: LocalActorRef ⇒ + ref.underlying.attachChild(props, name, systemService = false) + case ref: RepointableActorRef ⇒ + if (ref.isStarted) + ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false) + else { + implicit val timeout = ref.system.settings.CreationTimeout + val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef] + Await.result(f, timeout.duration) + } + case _ ⇒ + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") + } + +} + +/** + * INTERNAL API + */ +private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider { + override def get(system: ActorSystem): FlowNameCounter = super.get(system) + override def lookup = FlowNameCounter + override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter +} + +/** + * INTERNAL API + */ +private[akka] class FlowNameCounter extends Extension { + val counter = new AtomicLong(0) +} + +/** + * INTERNAL API + */ +private[akka] object StreamSupervisor { + def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings)) + + case class Materialize(props: Props, name: String) +} + +private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor { + import StreamSupervisor._ + + def receive = { + case Materialize(props, name) ⇒ + val impl = context.actorOf(props, name) + sender() ! impl + } +} + +/** + * INTERNAL API + */ +private[akka] object ActorProcessorFactory { + + import Ast._ + def props(settings: MaterializerSettings, op: AstNode): Props = + (op match { + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) + }).withDispatcher(settings.dispatcher) + + def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { + val p = new ActorProcessor[I, O](impl) + impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) + p + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index fc240e390e..832bc9d17b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -3,41 +3,34 @@ */ package akka.stream.scaladsl2 -import akka.stream.impl.Ast -import org.reactivestreams.{ Subscriber, Publisher } - +import scala.language.higherKinds import scala.collection.immutable import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy } +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import akka.stream.Transformer +import akka.stream.impl.BlackholeSubscriber +import akka.stream.impl2.Ast._ -sealed trait Flow[-In, +Out] { - val transform: Transform[In, Out] -} +sealed trait Flow -object From { +object FlowFrom { /** - * Helper to create Flow without Input. - * Example usage: From[Int] + * Helper to create `Flow` without [[Input]]. + * Example usage: `FlowFrom[Int]` */ - def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](EmptyTransform[T, T]()) + def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) /** - * Helper to create Flow with Input from Iterable. - * Example usage: Flow(Seq(1,2,3)) + * Helper to create `Flow` with Input from `Iterable`. + * Example usage: `FlowFrom(Seq(1,2,3))` */ - def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = From[T].withInput(IterableIn(i)) + def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(IterableIn(i)) /** - * Helper to create Flow with Input from Future. - * Example usage: Flow(Future { 1 }) + * Helper to create `Flow` with [[Input]] from `Publisher`. */ - def apply[T](f: Future[T]): PublisherFlow[T, T] = From[T].withInput(FutureIn(f)) - - /** - * Helper to create Flow with Input from Publisher. - */ - def apply[T](p: Publisher[T]): PublisherFlow[T, T] = From[T].withInput(PublisherIn(p)) + def apply[T](p: Publisher[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(PublisherIn(p)) } trait Input[-In] @@ -81,6 +74,10 @@ final case class PublisherOut[+Out]() extends Output[Out] { def publisher[O >: Out]: Publisher[O] = ??? } +final case class BlackholeOut[+Out]() extends Output[Out] { + def publisher[O >: Out]: Publisher[O] = ??? +} + /** * Output to a Subscriber. */ @@ -99,145 +96,123 @@ final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out * No Out type parameter would be useful for Graph signatures, but we need it here * for `withInput` and `prependTransform` methods. */ -sealed trait HasOpenInput[-In, +Out] { +sealed trait HasOpenInput[-In, +Out] extends Flow { type Repr[-In, +Out] <: HasOpenInput[In, Out] - type AfterCloseInput[-In, +Out] <: Flow[In, Out] + type AfterCloseInput[-In, +Out] <: Flow def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] - // linear combinators with flows - def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = - prependTransform(f.transform) - def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = - prependTransform(f.transform).withInput(f.input) + def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] + def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] + } /** * Operations with a Flow which has open (no attached) Output. * * No In type parameter would be useful for Graph signatures, but we need it here - * for `withOutput` and `appendTransform` methods. + * for `withOutput`. */ -trait HasOpenOutput[-In, +Out] { +trait HasOpenOutput[-In, +Out] extends Flow { type Repr[-In, +Out] <: HasOpenOutput[In, Out] - type AfterCloseOutput[-In, +Out] <: Flow[In, Out] + type AfterCloseOutput[-In, +Out] <: Flow + + // Storing ops in reverse order + protected def andThen[U](op: AstNode): Repr[In, U] def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] - protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] - // linear simple combinators def map[T](f: Out ⇒ T): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def filter(p: Out ⇒ Boolean): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def drop(n: Int): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def dropWithin(d: FiniteDuration): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def take(n: Int): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def takeWithin(d: FiniteDuration): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def grouped(n: Int): Repr[In, immutable.Seq[Out]] = - appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) - def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] = - appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def transform[T](transformer: Transformer[Out, T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = - appendTransform(EmptyTransform[Out, S]()) - def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] = - appendTransform(EmptyTransform[Out, O]()) - def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) + transform("map", () ⇒ new Transformer[Out, T] { + override def onNext(in: Out) = List(f(in)) + }) - // linear combinators which produce multiple flows - def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], PublisherFlow[O, O])] = - appendTransform(EmptyTransform[Out, (immutable.Seq[O], PublisherFlow[O, O])]()) - def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, PublisherFlow[O, O])] = - appendTransform(EmptyTransform[Out, (K, PublisherFlow[O, O])]()) - def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, PublisherFlow[O, O]] = - appendTransform(EmptyTransform[Out, PublisherFlow[O, O]]()) + def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = { + andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) + } - // linear combinators which consume multiple flows - def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - - // linear combinators with flows - def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = - appendTransform(f.transform) - def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = - appendTransform(f.transform).withOutput(f.output) + def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] + def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] } /** * Flow without attached input and without attached output, can be used as a `Processor`. */ -final case class ProcessorFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] { +final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { override type Repr[-In, +Out] = ProcessorFlow[In, Out] type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out] type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, transform) - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, transform) + override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = ProcessorFlow(t ++ transform) - protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = ProcessorFlow(transform ++ t) + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, ops) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, ops) + + override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = + ProcessorFlow(ops ::: f.ops) + override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + PublisherFlow(f.input, ops ::: f.ops) + + override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops) + override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + SubscriberFlow(f.output, f.ops ++: ops) } /** * Flow with attached output, can be used as a `Subscriber`. */ -final case class SubscriberFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] { +final case class SubscriberFlow[-In, +Out](output: Output[Out], ops: List[AstNode]) extends HasOpenInput[In, Out] { type Repr[-In, +Out] = SubscriberFlow[In, Out] type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out] - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, transform) - def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(transform) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, ops) + def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(ops) - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = - SubscriberFlow(output, t ++ transform) + override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = + SubscriberFlow(output, ops ::: f.ops) + override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + RunnableFlow(f.input, output, ops ::: f.ops) } /** * Flow with attached input, can be used as a `Publisher`. */ -final case class PublisherFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] { +final case class PublisherFlow[-In, +Out](input: Input[In], ops: List[AstNode]) extends HasOpenOutput[In, Out] { override type Repr[-In, +Out] = PublisherFlow[In, Out] type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, transform) - def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(transform) + override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, ops) + def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + + override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = PublisherFlow(input, f.ops ++: ops) + override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + RunnableFlow(input, f.output, f.ops ++: ops) - protected def appendTransform[T](t: Transform[Out, T]) = PublisherFlow(input, transform ++ t) } /** * Flow with attached input and output, can be executed. */ -final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { - def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, transform) - def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, transform) +final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], ops: List[AstNode]) extends Flow { + def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, ops) + def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, ops) + + // FIXME + def run()(implicit materializer: FlowMaterializer): Unit = + produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) + + // FIXME replace with run and input/output factories + def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = + input match { + case PublisherIn(p) ⇒ materializer.toPublisher(ExistingPublisher(p), ops) + case IterableIn(iter) ⇒ materializer.toPublisher(IterablePublisherNode(iter), ops) + case _ ⇒ ??? + } + + def produceTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit = + toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]]) - def run(): Unit = () } -trait Transform[-In, +Out] { - def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() -} -final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] - -object FlattenStrategy { - def concatPublisherFlow[In, Out]: FlattenStrategy[PublisherFlow[In, Out], Out] = ConcatPublisherFlow[In, Out]() - def concatProcessorFlow[In, Out]: FlattenStrategy[ProcessorFlow[In, Out], Out] = ConcatProcessorFlow[In, Out]() - - final case class ConcatPublisherFlow[In, Out]() extends FlattenStrategy[PublisherFlow[In, Out], Out] - final case class ConcatProcessorFlow[In, Out]() extends FlattenStrategy[ProcessorFlow[In, Out], Out] -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala deleted file mode 100644 index b119c3e0af..0000000000 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl2 - -final case class Merge[T, U, V >: T with U]() { - val in1 = new Output[T] {} - val in2 = new Output[U] {} - val out = new Input[V] {} -} - -final case class Zip[T, U]() { - val in1 = new Output[T] {} - val in2 = new Output[U] {} - val out = new Input[(T, U)] {} -} - -final case class Concat[T, U, V >: T with U]() { - val in1 = new Output[T] {} - val in2 = new Output[U] {} - val out = new Input[V] {} -} - -final case class Broadcast[T]() { - val in = new Output[T] {} - val out1 = new Input[T] {} - val out2 = new Input[T] {} -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala new file mode 100644 index 0000000000..8bb91e81d5 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorRefFactory +import akka.stream.impl2.ActorBasedFlowMaterializer +import akka.stream.impl2.Ast +import org.reactivestreams.{ Publisher, Subscriber } +import scala.concurrent.duration._ +import akka.actor.Deploy +import akka.actor.ExtendedActorSystem +import akka.actor.ActorContext +import akka.stream.impl2.StreamSupervisor +import akka.stream.impl2.FlowNameCounter +import akka.stream.MaterializerSettings + +object FlowMaterializer { + + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + + "got [${_contex.getClass.getName}]") + } + + new ActorBasedFlowMaterializer( + settings, + context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), + FlowNameCounter(system).counter, + namePrefix.getOrElse("flow")) + } + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + */ + def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = + apply(settings)(context) +} + +/** + * A FlowMaterializer takes the list of transformations comprising a + * [[akka.stream.scaladsl.Flow]] and materializes them in the form of + * [[org.reactivestreams.Processor]] instances. How transformation + * steps are split up into asynchronous regions is implementation + * dependent. + */ +abstract class FlowMaterializer(val settings: MaterializerSettings) { + + /** + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. + */ + def withNamePrefix(name: String): FlowMaterializer + + /** + * INTERNAL API + * ops are stored in reverse order + */ + private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O] + +} + diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala index 1dc9a7d504..57440f2e66 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala @@ -11,87 +11,12 @@ import scala.concurrent.Future import akka.stream.OverflowStrategy class CombinatorSpec extends WordSpec with Matchers { - val f = From[Int] + val f = FlowFrom[Int] "Linear simple combinators in Flow" should { "map" in { val t: ProcessorFlow[Int, Int] = f.map(_ * 2) } - "mapFuture" in { - import scala.concurrent.ExecutionContext.Implicits.global - val t: ProcessorFlow[Int, Int] = f.mapFuture(Future(_)) - } - "filter" in { - val t: ProcessorFlow[Int, Int] = f.filter(_ != 2) - } - "collect" in { - val t: ProcessorFlow[Int, String] = f.collect { - case i: Int if i == 2 ⇒ "two" - } - } - "fold" in { - val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } - val t: SubscriberFlow[Int, Int] = f.withOutput(fo) - } - "drop" in { - val t: ProcessorFlow[Int, Int] = f.drop(2) - } - "dropWithin" in { - val t: ProcessorFlow[Int, Int] = f.dropWithin(2.seconds) - } - "take" in { - val t: ProcessorFlow[Int, Int] = f.take(2) - } - "takeWithin" in { - val t: ProcessorFlow[Int, Int] = f.takeWithin(2.seconds) - } - "grouped" in { - val t: ProcessorFlow[Int, immutable.Seq[Int]] = f.grouped(2) - } - "groupedWithin" in { - val t: ProcessorFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds) - } - "mapConcat" in { - val t: ProcessorFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) } - } - "conflate" in { - val t: ProcessorFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i) - } - "expand" in { - val t: ProcessorFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_")) - } - "buffer" in { - val t: ProcessorFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead) - } - } - - "Linear combinators which produce multiple flows" should { - "prefixAndTail" in { - val t: ProcessorFlow[Int, (immutable.Seq[String], PublisherFlow[String, String])] = - f.map(_.toString).prefixAndTail(10) - } - "groupBy" in { - val grouped: PublisherFlow[Int, (String, PublisherFlow[Int, Int])] = - From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString) - - val closedInner: PublisherFlow[Int, (String, RunnableFlow[Int, Int])] = grouped.map { - case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut())) - } - - // both of these compile, even if `grouped` has inner flows unclosed - grouped.withOutput(PublisherOut()).run - closedInner.withOutput(PublisherOut()).run - } - "splitWhen" in { - val t: ProcessorFlow[Int, PublisherFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) - } - } - - "Linear combinators which consume multiple flows" should { - "flatten" in { - val split: ProcessorFlow[Int, PublisherFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) - val flattened: ProcessorFlow[Int, String] = split.flatten(FlattenStrategy.concatPublisherFlow) - } } } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala deleted file mode 100644 index 2c966f061e..0000000000 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphSpec.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl2 - -import org.scalatest.{ WordSpec, Matchers } - -import scala.collection.immutable.Seq - -class GraphSpec extends WordSpec with Matchers { - - "A Flow Graph" should { - "merge" in { - val merge = Merge[Int, Int, Int]() - - val in1 = From[Int].withOutput(merge.in1) - val in2 = From[Int].withOutput(merge.in2) - val out1 = From[Int].withInput(merge.out) - - val out2 = From[String] - // FIXME: make me not compile - //"out2.withInput(merge.out)" shouldNot compile - } - "zip" in { - val zip = Zip[Int, String]() - - val in1 = From[Int].withOutput(zip.in1) - val in2 = From[String].withOutput(zip.in2) - val out1 = From[(Int, String)].withInput(zip.out) - - val out2 = From[(String, Int)] - // FIXME: make me not compile - //"out2.withInput(zip.out)" shouldNot compile - } - "concat" in { - trait A - trait B extends A - - val concat = Concat[A, B, A]() - val in1 = From[A].withOutput(concat.in1) - val in2 = From[B].withOutput(concat.in2) - val out1 = From[A].withInput(concat.out) - - val out2 = From[String] - // FIXME: make me not compile - //"out2.withInput(concat.out)" shouldNot compile - } - "broadcast" in { - val broadcast = Broadcast[Int]() - - val in1 = From[Int].withOutput(broadcast.in) - val in2 = From[Int].withInput(broadcast.out1) - val out1 = From[Int].withInput(broadcast.out2) - - val out2 = From[String] - // FIXME: make me not compile - //"out2.withInput(broadcast.out2)" shouldNot compile - } - } -} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index ec69b5d2c5..e0da6678ab 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -4,79 +4,81 @@ package akka.stream.scaladsl2 import org.scalatest.{ Matchers, WordSpec } - import scala.collection.immutable.Seq import scala.concurrent.Future +import akka.stream.testkit.AkkaSpec +import akka.stream.MaterializerSettings -class FlowSpec extends WordSpec with Matchers { +class FlowSpec extends AkkaSpec { val intSeq = IterableIn(Seq(1, 2, 3)) val strSeq = IterableIn(Seq("a", "b", "c")) import scala.concurrent.ExecutionContext.Implicits.global val intFut = FutureIn(Future { 3 }) + implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) "ProcessorFlow" should { "go through all states" in { - val f: ProcessorFlow[Int, Int] = From[Int] + val f: ProcessorFlow[Int, Int] = FlowFrom[Int] .withInput(intSeq) .withOutput(PublisherOut()) .withoutInput .withoutOutput } "should not run" in { - val open: ProcessorFlow[Int, Int] = From[Int] - "open.run" shouldNot compile + val open: ProcessorFlow[Int, Int] = FlowFrom[Int] + "open.run()" shouldNot compile } "accept IterableIn" in { - val f: PublisherFlow[Int, Int] = From[Int].withInput(intSeq) + val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intSeq) } "accept FutureIn" in { - val f: PublisherFlow[Int, Int] = From[Int].withInput(intFut) + val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intFut) } "append ProcessorFlow" in { - val open1: ProcessorFlow[Int, String] = From[Int].map(_.toString) - val open2: ProcessorFlow[String, Int] = From[String].map(_.hashCode) + val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val open2: ProcessorFlow[String, Int] = FlowFrom[String].map(_.hashCode) val open3: ProcessorFlow[Int, Int] = open1.append(open2) - "open3.run" shouldNot compile + "open3.run()" shouldNot compile val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq) - "closedInput.run" shouldNot compile + "closedInput.run()" shouldNot compile val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut()) - "closedOutput.run" shouldNot compile + "closedOutput.run()" shouldNot compile - closedInput.withOutput(PublisherOut()).run - closedOutput.withInput(intSeq).run + closedInput.withOutput(PublisherOut()).run() + closedOutput.withInput(intSeq).run() } "prepend ProcessorFlow" in { - val open1: ProcessorFlow[Int, String] = From[Int].map(_.toString) - val open2: ProcessorFlow[String, Int] = From[String].map(_.hashCode) + val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val open2: ProcessorFlow[String, Int] = FlowFrom[String].map(_.hashCode) val open3: ProcessorFlow[String, String] = open1.prepend(open2) - "open3.run" shouldNot compile + "open3.run()" shouldNot compile val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq) - "closedInput.run" shouldNot compile + "closedInput.run()" shouldNot compile val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut()) - "closedOutput.run" shouldNot compile + "closedOutput.run()" shouldNot compile closedInput.withOutput(PublisherOut()).run closedOutput.withInput(strSeq).run } "append SubscriberFlow" in { - val open: ProcessorFlow[Int, String] = From[Int].map(_.toString) - val closedOutput: SubscriberFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) + val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val closedOutput: SubscriberFlow[String, Int] = FlowFrom[String].map(_.hashCode).withOutput(PublisherOut()) val appended: SubscriberFlow[Int, Int] = open.append(closedOutput) - "appended.run" shouldNot compile + "appended.run()" shouldNot compile "appended.toFuture" shouldNot compile appended.withInput(intSeq).run } "prepend PublisherFlow" in { - val open: ProcessorFlow[Int, String] = From[Int].map(_.toString) - val closedInput: PublisherFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq) + val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val closedInput: PublisherFlow[String, Int] = FlowFrom[String].map(_.hashCode).withInput(strSeq) val prepended: PublisherFlow[String, String] = open.prepend(closedInput) - "prepended.run" shouldNot compile + "prepended.run()" shouldNot compile "prepended.withInput(strSeq)" shouldNot compile prepended.withOutput(PublisherOut()).run } @@ -84,7 +86,7 @@ class FlowSpec extends WordSpec with Matchers { "SubscriberFlow" should { val openInput: SubscriberFlow[Int, String] = - From[Int].map(_.toString).withOutput(PublisherOut()) + FlowFrom[Int].map(_.toString).withOutput(PublisherOut()) "accept Input" in { openInput.withInput(intSeq) } @@ -97,14 +99,14 @@ class FlowSpec extends WordSpec with Matchers { "not accept Output" in { "openInput.ToFuture" shouldNot compile } - "not run" in { - "openInput.run" shouldNot compile + "not run()" in { + "openInput.run()" shouldNot compile } } "PublisherFlow" should { val openOutput: PublisherFlow[Int, String] = - From(Seq(1, 2, 3)).map(_.toString) + FlowFrom(Seq(1, 2, 3)).map(_.toString) "accept Output" in { openOutput.withOutput(PublisherOut()) } @@ -117,16 +119,16 @@ class FlowSpec extends WordSpec with Matchers { "not accept Input" in { "openOutput.withInput(intSeq)" shouldNot compile } - "not run" in { - "openOutput.run" shouldNot compile + "not run()" in { + "openOutput.run()" shouldNot compile } } "RunnableFlow" should { val closed: RunnableFlow[Int, String] = - From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) + FlowFrom(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) "run" in { - closed.run + closed.run() } "drop Input" in { closed.withoutInput diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala new file mode 100644 index 0000000000..6c32bda6bc --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -0,0 +1,403 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.testkit.{ EventFilter, TestProbe } +import com.typesafe.config.ConfigFactory +import scala.collection.immutable.Seq +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import akka.stream.Transformer +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { + + implicit val materializer = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) + + "A Flow with transform operations" must { + "produce one-to-one transformation as expected" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(elem: Int) = { + tot += elem + List(tot) + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(1) + subscriber.expectNext(1) + subscriber.expectNoMsg(200.millis) + subscription.request(2) + subscriber.expectNext(3) + subscriber.expectNext(6) + subscriber.expectComplete() + } + + "produce one-to-several transformation as expected" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(elem: Int) = { + tot += elem + Vector.fill(elem)(tot) + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(4) + subscriber.expectNext(1) + subscriber.expectNext(3) + subscriber.expectNext(3) + subscriber.expectNext(6) + subscriber.expectNoMsg(200.millis) + subscription.request(100) + subscriber.expectNext(6) + subscriber.expectNext(6) + subscriber.expectComplete() + } + + "produce dropping transformation as expected" in { + val p = FlowFrom(List(1, 2, 3, 4)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(elem: Int) = { + tot += elem + if (elem % 2 == 0) { + Nil + } else { + List(tot) + } + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(1) + subscriber.expectNext(1) + subscriber.expectNoMsg(200.millis) + subscription.request(1) + subscriber.expectNext(6) + subscription.request(1) + subscriber.expectComplete() + } + + "produce multi-step transformation as expected" in { + val p = FlowFrom(List("a", "bc", "def")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, Int] { + var concat = "" + override def onNext(elem: String) = { + concat += elem + List(concat.length) + } + }). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(length: Int) = { + tot += length + List(tot) + } + }). + withOutput(PublisherOut()).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c1) + val sub1 = c1.expectSubscription() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c2) + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(4) + c1.expectNoMsg(200.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext(4) + c1.expectNext(10) + c2.expectNext(10) + c1.expectComplete() + c2.expectComplete() + } + + "invoke onComplete when done" in { + val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "" + override def onNext(element: String) = { + s += element + Nil + } + override def onTermination(e: Option[Throwable]) = List(s + "B") + }). + withOutput(PublisherOut()).toPublisher() + val c = StreamTestKit.SubscriberProbe[String]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(1) + c.expectNext("aB") + c.expectComplete() + } + + "invoke cleanup when done" in { + val cleanupProbe = TestProbe() + val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "" + override def onNext(element: String) = { + s += element + Nil + } + override def onTermination(e: Option[Throwable]) = List(s + "B") + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(PublisherOut()).toPublisher() + val c = StreamTestKit.SubscriberProbe[String]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(1) + c.expectNext("aB") + c.expectComplete() + cleanupProbe.expectMsg("a") + } + + "invoke cleanup when done consume" in { + val cleanupProbe = TestProbe() + val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "x" + override def onNext(element: String) = { + s = element + List(element) + } + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(BlackholeOut()).run() + cleanupProbe.expectMsg("a") + } + + "invoke cleanup when done after error" in { + val cleanupProbe = TestProbe() + val p = FlowFrom(List("a", "b", "c")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "" + override def onNext(in: String) = { + if (in == "b") { + throw new IllegalArgumentException("Not b") with NoStackTrace + } else { + val out = s + in + s += in.toUpperCase + List(out) + } + } + override def onTermination(e: Option[Throwable]) = List(s + "B") + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(PublisherOut()).toPublisher() + val c = StreamTestKit.SubscriberProbe[String]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(1) + c.expectNext("a") + s.request(1) + c.expectError() + cleanupProbe.expectMsg("A") + } + + "allow cancellation using isComplete" in { + val p = StreamTestKit.PublisherProbe[Int]() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var s = "" + override def onNext(element: Int) = { + s += element + List(element) + } + override def isComplete = s == "1" + }). + withOutput(PublisherOut()).toPublisher() + val proc = p.expectSubscription + val c = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(10) + proc.sendNext(1) + proc.sendNext(2) + c.expectNext(1) + c.expectComplete() + proc.expectCancellation() + } + + "call onComplete after isComplete signaled completion" in { + val cleanupProbe = TestProbe() + val p = StreamTestKit.PublisherProbe[Int]() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var s = "" + override def onNext(element: Int) = { + s += element + List(element) + } + override def isComplete = s == "1" + override def onTermination(e: Option[Throwable]) = List(s.length + 10) + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(PublisherOut()).toPublisher() + val proc = p.expectSubscription + val c = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(10) + proc.sendNext(1) + proc.sendNext(2) + c.expectNext(1) + c.expectNext(11) + c.expectComplete() + proc.expectCancellation() + cleanupProbe.expectMsg("1") + } + + "report error when exception is thrown" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + override def onNext(elem: Int) = { + if (elem == 2) { + throw new IllegalArgumentException("two not allowed") + } else { + List(elem, elem) + } + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + EventFilter[IllegalArgumentException]("two not allowed") intercept { + subscription.request(100) + subscriber.expectNext(1) + subscriber.expectNext(1) + subscriber.expectError().getMessage should be("two not allowed") + subscriber.expectNoMsg(200.millis) + } + } + + "support cancel as expected" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + override def onNext(elem: Int) = List(elem, elem) + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(2) + subscriber.expectNext(1) + subscription.cancel() + subscriber.expectNext(1) + subscriber.expectNoMsg(500.millis) + subscription.request(2) + subscriber.expectNoMsg(200.millis) + } + + "support producing elements from empty inputs" in { + val p = FlowFrom(List.empty[Int]).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + override def onNext(elem: Int) = Nil + override def onTermination(e: Option[Throwable]) = List(1, 2, 3) + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(4) + subscriber.expectNext(1) + subscriber.expectNext(2) + subscriber.expectNext(3) + subscriber.expectComplete() + + } + + "support converting onComplete into onError" in { + val subscriber = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(List(5, 1, 2, 3)).transform("transform", () ⇒ new Transformer[Int, Int] { + var expectedNumberOfElements: Option[Int] = None + var count = 0 + override def onNext(elem: Int) = + if (expectedNumberOfElements.isEmpty) { + expectedNumberOfElements = Some(elem) + Nil + } else { + count += 1 + List(elem) + } + override def onTermination(err: Option[Throwable]) = err match { + case Some(e) ⇒ Nil + case None ⇒ + expectedNumberOfElements match { + case Some(expected) if count != expected ⇒ + throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace + case _ ⇒ Nil + } + } + }).withOutput(PublisherOut()).produceTo(subscriber) + + val subscription = subscriber.expectSubscription() + subscription.request(10) + + subscriber.expectNext(1) + subscriber.expectNext(2) + subscriber.expectNext(3) + subscriber.expectError().getMessage should be("Expected 5, got 3") + } + + "be safe to reuse" in { + val flow = FlowFrom(1 to 3).transform("transform", () ⇒ + new Transformer[Int, Int] { + var count = 0 + + override def onNext(elem: Int): Seq[Int] = { + count += 1 + List(count) + } + }).withOutput(PublisherOut()) + + val s1 = StreamTestKit.SubscriberProbe[Int]() + flow.produceTo(s1) + s1.expectSubscription().request(3) + s1.expectNext(1, 2, 3) + s1.expectComplete() + + val s2 = StreamTestKit.SubscriberProbe[Int]() + flow.produceTo(s2) + s2.expectSubscription().request(3) + s2.expectNext(1, 2, 3) + s2.expectComplete() + } + } + +}