diff --git a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala new file mode 100644 index 0000000000..4a9defde83 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala @@ -0,0 +1,114 @@ +package akka.http.server + +import akka.event.LoggingAdapter +import akka.http.Http +import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse } +import akka.http.parsing.HttpRequestParser +import akka.http.parsing.ParserOutput._ +import akka.http.rendering.ResponseRenderingContext +import akka.stream.dsl._ +import akka.stream.io.StreamTcp +import akka.stream.{ FlowMaterializer, Transformer } +import akka.util.ByteString + +class NewDslHttpServerPipeline(settings: ServerSettings, + materializer: FlowMaterializer, + log: LoggingAdapter) { + + import akka.http.server.NewDslHttpServerPipeline._ + + val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)() + val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ + if (settings.parserSettings.illegalHeaderWarnings) + log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) + + val responseRendererFactory = new { + def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ??? + } + + /** + * Flow graph: + * + * tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest] + * | + * \-> applicationBypassFlow -\ + * | + * Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream + */ + def apply(tcpConn: StreamTcp.IncomingTcpConnection) = { + + val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]() + val merge = Merge[MessageStart, HttpResponse, Any]() + + val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] = + From(tcpConn.inputStream) + .transform(rootParser.copyWith(warnOnIllegalHeader)) + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) + .headAndTail() + .withOutput(broadcast.in) + + val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] = + From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] + .withInput(broadcast.out1) + .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } + .withOutput(merge.in1) + + val requestPublisher = PublisherOut[HttpRequest]() + val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] = + From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] + .withInput(broadcast.out2) + .collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + val publisher = PublisherOut[RequestOutput]() + val flow = entityParts.withOutput(publisher) + HttpRequest(method, effectiveUri, headers, createEntity(publisher.publisher), protocol) + } + .withOutput(requestPublisher) + + val responseSubscriber = SubscriberIn[HttpResponse]() + val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] = + From[HttpResponse] + .withInput(responseSubscriber) + .withOutput(merge.in2) + + val responseFlowAfterMerge: ClosedFlow[Any, ByteString] = + From[Any] + .withInput(merge.out) + .transform(applyApplicationBypass) + .transform(responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concatOpenOutputFlow) + .transform(errorLogger(log, "Outgoing response stream error")) + .withOutput(SubscriberOut(tcpConn.outputStream)) + + Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher.publisher, responseSubscriber.subscriber) + } + + def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ??? + + private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ??? +} + +object NewDslHttpServerPipeline { + + /** + * FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow` + * or `OpenOutputFlow` to `HasOpenOutput`. + * + * Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`. + */ + implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal { + def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = { + val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] = + From[OpenOutputFlow[InnerIn, InnerOut]] + .map { f ⇒ + f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } + } + + val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = + flow.flatten(FlattenStrategy.concatOpenOutputFlow) + + underlying.append(flattened) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala new file mode 100644 index 0000000000..aa2edb4dd8 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala @@ -0,0 +1,228 @@ +package akka.stream.dsl + +import akka.stream.impl.Ast +import org.reactivestreams.{ Subscriber, Publisher } + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy } + +sealed trait Flow[-In, +Out] { + val transform: Transform[In, Out] +} + +object From { + /** + * Helper to create Flow without Input. + * Example usage: From[Int] + */ + def apply[T]: OpenFlow[T, T] = OpenFlow[T, T](EmptyTransform[T, T]()) + + /** + * Helper to create Flow with Input from Iterable. + * Example usage: Flow(Seq(1,2,3)) + */ + def apply[T](i: immutable.Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i)) + + /** + * Helper to create Flow with Input from Future. + * Example usage: Flow(Future { 1 }) + */ + def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f)) + + /** + * Helper to create Flow with Input from Publisher. + */ + def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p)) +} + +trait Input[-In] + +/** + * Default input. + * Allows to materialize a Flow with this input to Subscriber. + */ +final case class SubscriberIn[-In]() extends Input[In] { + def subscriber[I <: In]: Subscriber[I] = ??? +} + +/** + * Input from Publisher. + */ +final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] + +/** + * Input from Iterable + * + * Changing In from Contravariant to Covariant is needed because Iterable[+A]. + * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] + */ +final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] + +/** + * Input from Future + * + * Changing In from Contravariant to Covariant is needed because Future[+A]. + * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] + */ +final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] + +trait Output[+Out] + +/** + * Default output. + * Allows to materialize a Flow with this output to Publisher. + */ +final case class PublisherOut[+Out]() extends Output[Out] { + def publisher[O >: Out]: Publisher[O] = ??? +} + +/** + * Output to a Subscriber. + */ +final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] + +/** + * Fold output. Reduces output stream according to the given fold function. + */ +final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { + def future: Future[T] = ??? +} + +/** + * Operations with a Flow which has open (no attached) Input. + * + * No Out type parameter would be useful for Graph signatures, but we need it here + * for `withInput` and `prependTransform` methods. + */ +sealed trait HasOpenInput[-In, +Out] { + type Repr[-In, +Out] <: HasOpenInput[In, Out] + type AfterCloseInput[-In, +Out] <: Flow[In, Out] + + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] + + // linear combinators with flows + def prepend[T](f: OpenFlow[T, In]): Repr[T, Out] = + prependTransform(f.transform) + def prepend[T](f: OpenOutputFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + prependTransform(f.transform).withInput(f.input) +} + +/** + * Operations with a Flow which has open (no attached) Output. + * + * No In type parameter would be useful for Graph signatures, but we need it here + * for `withOutput` and `appendTransform` methods. + */ +trait HasOpenOutput[-In, +Out] { + type Repr[-In, +Out] <: HasOpenOutput[In, Out] + type AfterCloseOutput[-In, +Out] <: Flow[In, Out] + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] + protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] + + // linear simple combinators + def map[T](f: Out ⇒ T): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def filter(p: Out ⇒ Boolean): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def drop(n: Int): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def dropWithin(d: FiniteDuration): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def take(n: Int): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def takeWithin(d: FiniteDuration): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + def grouped(n: Int): Repr[In, immutable.Seq[Out]] = + appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) + def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] = + appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) + def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def transform[T](transformer: Transformer[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = + appendTransform(EmptyTransform[Out, S]()) + def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] = + appendTransform(EmptyTransform[Out, O]()) + def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] = + appendTransform(EmptyTransform[Out, Out]()) + + // linear combinators which produce multiple flows + def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], OpenOutputFlow[O, O])] = + appendTransform(EmptyTransform[Out, (immutable.Seq[O], OpenOutputFlow[O, O])]()) + def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, OpenOutputFlow[O, O])] = + appendTransform(EmptyTransform[Out, (K, OpenOutputFlow[O, O])]()) + def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, OpenOutputFlow[O, O]] = + appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]()) + + // linear combinators which consume multiple flows + def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) + + // linear combinators with flows + def append[T](f: OpenFlow[Out, T]): Repr[In, T] = + appendTransform(f.transform) + def append[T](f: OpenInputFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + appendTransform(f.transform).withOutput(f.output) +} + +final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] { + override type Repr[-In, +Out] = OpenFlow[In, Out] + type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] + type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) + + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) + protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = OpenFlow(transform ++ t) +} + +final case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] { + type Repr[-In, +Out] = OpenInputFlow[In, Out] + type AfterCloseInput[-In, +Out] = ClosedFlow[In, Out] + + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = ClosedFlow(in, output, transform) + def withoutOutput: OpenFlow[In, Out] = OpenFlow(transform) + + protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = + OpenInputFlow(output, t ++ transform) +} + +final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] { + override type Repr[-In, +Out] = OpenOutputFlow[In, Out] + type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) + def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) + + protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) +} + +final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { + def withoutOutput: OpenOutputFlow[In, Out] = OpenOutputFlow(input, transform) + def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) + + def run(): Unit = () +} + +trait Transform[-In, +Out] { + def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() +} +final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] + +object FlattenStrategy { + def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]() + def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]() + + final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out] + final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out] +} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala new file mode 100644 index 0000000000..3030f6cff9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala @@ -0,0 +1,25 @@ +package akka.stream.dsl + +final case class Merge[T, U, V >: T with U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[V] {} +} + +final case class Zip[T, U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[(T, U)] {} +} + +final case class Concat[T, U, V >: T with U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[V] {} +} + +final case class Broadcast[T]() { + val in = new Output[T] {} + val out1 = new Input[T] {} + val out2 = new Input[T] {} +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala new file mode 100644 index 0000000000..4e3f60bae8 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala @@ -0,0 +1,94 @@ +package akka.stream.dsl + +import org.scalatest.Matchers +import org.scalatest.WordSpec +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.Future +import akka.stream.OverflowStrategy + +class CombinatorSpec extends WordSpec with Matchers { + val f = From[Int] + + "Linear simple combinators in Flow" should { + "map" in { + val t: OpenFlow[Int, Int] = f.map(_ * 2) + } + "mapFuture" in { + import scala.concurrent.ExecutionContext.Implicits.global + val t: OpenFlow[Int, Int] = f.mapFuture(Future(_)) + } + "filter" in { + val t: OpenFlow[Int, Int] = f.filter(_ != 2) + } + "collect" in { + val t: OpenFlow[Int, String] = f.collect { + case i: Int if i == 2 ⇒ "two" + } + } + "fold" in { + val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } + val t: OpenInputFlow[Int, Int] = f.withOutput(fo) + } + "drop" in { + val t: OpenFlow[Int, Int] = f.drop(2) + } + "dropWithin" in { + val t: OpenFlow[Int, Int] = f.dropWithin(2.seconds) + } + "take" in { + val t: OpenFlow[Int, Int] = f.take(2) + } + "takeWithin" in { + val t: OpenFlow[Int, Int] = f.takeWithin(2.seconds) + } + "grouped" in { + val t: OpenFlow[Int, immutable.Seq[Int]] = f.grouped(2) + } + "groupedWithin" in { + val t: OpenFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds) + } + "mapConcat" in { + val t: OpenFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) } + } + "conflate" in { + val t: OpenFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i) + } + "expand" in { + val t: OpenFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_")) + } + "buffer" in { + val t: OpenFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead) + } + } + + "Linear combinators which produce multiple flows" should { + "prefixAndTail" in { + val t: OpenFlow[Int, (immutable.Seq[String], OpenOutputFlow[String, String])] = + f.map(_.toString).prefixAndTail(10) + } + "groupBy" in { + val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] = + From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString) + + val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map { + case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut())) + } + + // both of these compile, even if `grouped` has inner flows unclosed + grouped.withOutput(PublisherOut()).run + closedInner.withOutput(PublisherOut()).run + } + "splitWhen" in { + val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + } + } + + "Linear combinators which consume multiple flows" should { + "flatten" in { + val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow) + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala new file mode 100644 index 0000000000..fe6cee34dd --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala @@ -0,0 +1,141 @@ +package akka.stream.dsl + +import org.scalatest.{ Matchers, WordSpec } + +import scala.collection.immutable.Seq +import scala.concurrent.Future + +class FlowSpec extends WordSpec with Matchers { + + val intSeq = IterableIn(Seq(1, 2, 3)) + val strSeq = IterableIn(Seq("a", "b", "c")) + + import scala.concurrent.ExecutionContext.Implicits.global + val intFut = FutureIn(Future { 3 }) + + "OpenFlow" should { + "go through all states" in { + val f: OpenFlow[Int, Int] = From[Int] + .withInput(intSeq) + .withOutput(PublisherOut()) + .withoutInput + .withoutOutput + } + "should not run" in { + val open: OpenFlow[Int, Int] = From[Int] + "open.run" shouldNot compile + } + "accept IterableIn" in { + val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intSeq) + } + "accept FutureIn" in { + val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intFut) + } + "append OpenFlow" in { + val open1: OpenFlow[Int, String] = From[Int].map(_.toString) + val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) + val open3: OpenFlow[Int, Int] = open1.append(open2) + "open3.run" shouldNot compile + + val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq) + "closedInput.run" shouldNot compile + + val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut()) + "closedOutput.run" shouldNot compile + + closedInput.withOutput(PublisherOut()).run + closedOutput.withInput(intSeq).run + } + "prepend OpenFlow" in { + val open1: OpenFlow[Int, String] = From[Int].map(_.toString) + val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) + val open3: OpenFlow[String, String] = open1.prepend(open2) + "open3.run" shouldNot compile + + val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq) + "closedInput.run" shouldNot compile + + val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut()) + "closedOutput.run" shouldNot compile + + closedInput.withOutput(PublisherOut()).run + closedOutput.withInput(strSeq).run + } + "append OpenInputFlow" in { + val open: OpenFlow[Int, String] = From[Int].map(_.toString) + val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) + val appended: OpenInputFlow[Int, Int] = open.append(closedOutput) + "appended.run" shouldNot compile + "appended.toFuture" shouldNot compile + appended.withInput(intSeq).run + } + "prepend OpenOutputFlow" in { + val open: OpenFlow[Int, String] = From[Int].map(_.toString) + val closedInput: OpenOutputFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq) + val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput) + "prepended.run" shouldNot compile + "prepended.withInput(strSeq)" shouldNot compile + prepended.withOutput(PublisherOut()).run + } + } + + "OpenInputFlow" should { + val openInput: OpenInputFlow[Int, String] = + From[Int].map(_.toString).withOutput(PublisherOut()) + "accept Input" in { + openInput.withInput(intSeq) + } + "drop Output" in { + openInput.withoutOutput + } + "not drop Input" in { + "openInput.withoutInput" shouldNot compile + } + "not accept Output" in { + "openInput.ToFuture" shouldNot compile + } + "not run" in { + "openInput.run" shouldNot compile + } + } + + "OpenOutputFlow" should { + val openOutput: OpenOutputFlow[Int, String] = + From(Seq(1, 2, 3)).map(_.toString) + "accept Output" in { + openOutput.withOutput(PublisherOut()) + } + "drop Input" in { + openOutput.withoutInput + } + "not drop Output" in { + "openOutput.withoutOutput" shouldNot compile + } + "not accept Input" in { + "openOutput.withInput(intSeq)" shouldNot compile + } + "not run" in { + "openOutput.run" shouldNot compile + } + } + + "ClosedFlow" should { + val closed: ClosedFlow[Int, String] = + From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) + "run" in { + closed.run + } + "drop Input" in { + closed.withoutInput + } + "drop Output" in { + closed.withoutOutput + } + "not accept Input" in { + "closed.withInput(intSeq)" shouldNot compile + } + "not accept Output" in { + "closed.ToFuture" shouldNot compile + } + } +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala new file mode 100644 index 0000000000..46647af26f --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala @@ -0,0 +1,57 @@ +package akka.stream.dsl + +import org.scalatest.{ WordSpec, Matchers } + +import scala.collection.immutable.Seq + +class GraphSpec extends WordSpec with Matchers { + + "Graph" should { + "merge" in { + val merge = Merge[Int, Int, Int]() + + val in1 = From[Int].withOutput(merge.in1) + val in2 = From[Int].withOutput(merge.in2) + val out1 = From[Int].withInput(merge.out) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(merge.out)" shouldNot compile + } + "zip" in { + val zip = Zip[Int, String]() + + val in1 = From[Int].withOutput(zip.in1) + val in2 = From[String].withOutput(zip.in2) + val out1 = From[(Int, String)].withInput(zip.out) + + val out2 = From[(String, Int)] + // FIXME: make me not compile + //"out2.withInput(zip.out)" shouldNot compile + } + "concat" in { + trait A + trait B extends A + + val concat = Concat[A, B, A]() + val in1 = From[A].withOutput(concat.in1) + val in2 = From[B].withOutput(concat.in2) + val out1 = From[A].withInput(concat.out) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(concat.out)" shouldNot compile + } + "broadcast" in { + val broadcast = Broadcast[Int]() + + val in1 = From[Int].withOutput(broadcast.in) + val in2 = From[Int].withInput(broadcast.out1) + val out1 = From[Int].withInput(broadcast.out2) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(broadcast.out2)" shouldNot compile + } + } +}