diff --git a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala new file mode 100644 index 0000000000..aa5fb7f46e --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala @@ -0,0 +1,114 @@ +package akka.http.server + +import akka.event.LoggingAdapter +import akka.http.Http +import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse } +import akka.http.parsing.HttpRequestParser +import akka.http.parsing.ParserOutput._ +import akka.http.rendering.ResponseRenderingContext +import akka.stream.dsl.{ FlowMaterializer ⇒ NewFM, _ } +import akka.stream.io.StreamTcp +import akka.stream.{ FlowMaterializer, Transformer } +import akka.util.ByteString + +class NewDslHttpServerPipeline(settings: ServerSettings, + materializer: FlowMaterializer, + log: LoggingAdapter) { + + import akka.http.server.NewDslHttpServerPipeline._ + + val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)() + val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ + if (settings.parserSettings.illegalHeaderWarnings) + log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) + + val responseRendererFactory = new { + def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ??? + } + + /** + * Flow graph: + * + * tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest] + * | + * \-> applicationBypassFlow -\ + * | + * Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream + */ + def apply(tcpConn: StreamTcp.IncomingTcpConnection) = { + + val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]() + val merge = Merge[MessageStart, HttpResponse, Any]() + + val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] = + From(tcpConn.inputStream) + .transform(rootParser.copyWith(warnOnIllegalHeader)) + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) + .headAndTail() + .withOutput(broadcast.in) + + val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] = + From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] + .withInput(broadcast.out1) + .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } + .withOutput(merge.in1) + + val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] = + From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] + .withInput(broadcast.out2) + .collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + val pub = entityParts.withOutput(PublisherOut()).as(NewFM.publisher[RequestOutput]) + HttpRequest(method, effectiveUri, headers, createEntity(pub), protocol) + } + .withOutput(PublisherOut()) + + val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] = + From[HttpResponse] + .withInput(SubscriberIn()) + .withOutput(merge.in2) + + val responseFlowAfterMerge: ClosedFlow[Any, ByteString] = + From[Any] + .withInput(merge.out) + .transform(applyApplicationBypass) + .transform(responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concatOpenOutputFlow) + .transform(errorLogger(log, "Outgoing response stream error")) + .withOutput(SubscriberOut(tcpConn.outputStream)) + + val requestPublisher = requestFlowAfterBroadcast.as(NewFM.publisher[HttpRequest]) + val responseSubscriber = responseFlowBeforeMerge.as(NewFM.subscriber[HttpResponse]) + + Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) + } + + def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ??? + + private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ??? +} + +object NewDslHttpServerPipeline { + + /** + * FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow` + * or `OpenOutputFlow` to `HasOpenOutput`. + * + * Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`. + */ + implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal { + def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = { + val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] = + From[OpenOutputFlow[InnerIn, InnerOut]] + .map { f ⇒ + f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } + } + + val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = + flow.flatten(FlattenStrategy.concatOpenOutputFlow) + + underlying.append(flattened) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala index b4f1e690f6..41ed99eed3 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala @@ -1,10 +1,12 @@ package akka.stream.dsl +import akka.stream.impl.Ast +import org.reactivestreams.{ Subscriber, Publisher } + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import akka.stream.OverflowStrategy -import akka.stream.FlattenStrategy +import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy } sealed trait Flow[-In, +Out] { val transform: Transform[In, Out] @@ -28,10 +30,26 @@ object From { * Example usage: Flow(Future { 1 }) */ def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f)) + + /** + * Helper to create Flow with Input from Publisher. + */ + def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p)) } trait Input[-In] +/** + * Default input. + * Allows to materialize a Flow with this input to Subscriber. + */ +final case class SubscriberIn[-In]() extends Input[In] + +/** + * Input from Publisher. + */ +final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] + /** * Input from Iterable * @@ -48,11 +66,28 @@ final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[I */ final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] -trait Output[+Out] +trait Output[+Out] { + def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T] +} -final case class FutureOut[+Out]() extends Output[Out] +/** + * Default output. Does no reducing operations. + * Allows to materialize a Flow with this output to Publisher. + */ final case class PublisherOut[+Out]() extends Output[Out] +/** + * Output to a Subscriber. + */ +final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] + +/** + * Fold output. Reduces output stream according to the given fold function. + */ +final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { + override def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T] +} + /** * Operations with a Flow which has open (no attached) Input. * @@ -95,8 +130,6 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, Out]()) def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = appendTransform(EmptyTransform[Out, T]()) - def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) def drop(n: Int): Repr[In, Out] = appendTransform(EmptyTransform[Out, Out]()) def dropWithin(d: FiniteDuration): Repr[In, Out] = @@ -111,6 +144,8 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] = appendTransform(EmptyTransform[Out, T]()) + def transform[T](transformer: Transformer[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = appendTransform(EmptyTransform[Out, S]()) def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] = @@ -127,8 +162,8 @@ trait HasOpenOutput[-In, +Out] { appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]()) // linear combinators which consume multiple flows - def flatten[O >: Out](strategy: FlattenStrategy[Out, O]): Repr[In, O] = - appendTransform(EmptyTransform[Out, O]()) + def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] = + appendTransform(EmptyTransform[Out, T]()) // linear combinators with flows def append[T](f: OpenFlow[Out, T]): Repr[In, T] = @@ -142,7 +177,7 @@ final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) + def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform ++ out.transform) def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) @@ -164,7 +199,7 @@ final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transfor override type Repr[-In, +Out] = OpenOutputFlow[In, Out] type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) + def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform ++ out.transform) def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) @@ -175,9 +210,50 @@ final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], tr def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) def run(): Unit = () + + def as[O >: Out, R[_]](implicit materializer: FlowMaterializer.M[O, R[O]]): R[O] = ??? } trait Transform[-In, +Out] { def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() } final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] + +object FlattenStrategy { + def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]() + def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]() + + final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out] + final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out] +} + +/** + * At the end we need to materialize the stream. It could be done by setting an output + * with a transformation (e.g Out -> Publisher[Out]) which would do the materialization. + * + * Or we could have an additional step on ClosedFlow like the one Viktor suggested. + * https://github.com/akka/akka/issues/15633#issuecomment-52307292 + */ +object FlowMaterializer { + trait M[T, R] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): R + } + + def publisher[T]: FlowMaterializer.M[T, Publisher[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Publisher[T]]] + private[this] final val publisizer = mkPublisher[Any] + private[this] def mkPublisher[T] = new FlowMaterializer.M[T, Publisher[T]] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[T] = ??? + } + + def subscriber[T]: FlowMaterializer.M[T, Subscriber[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Subscriber[T]]] + private[this] final val sbscrisizer = mkSubscriber[Any] + private[this] def mkSubscriber[T] = new FlowMaterializer.M[T, Subscriber[T]] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Subscriber[T] = ??? + } + + def future[T]: FlowMaterializer.M[T, Future[T]] = futurizer.asInstanceOf[M[T, Future[T]]] + private[this] final val futurizer = mkFuture[Any] + private[this] def mkFuture[T] = new M[T, Future[T]] { + def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Future[T] = ??? + } +} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala index 6a329df5fd..3030f6cff9 100644 --- a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala @@ -1,19 +1,25 @@ package akka.stream.dsl -/** - * Dummy implementation needed for runtime tests. - */ -trait Graph { - - def merge[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = () - def zip[T, U](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, U], destination: HasOpenInput[(T, U), _]) = () - def concat[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = () - def broadcast[T](source: HasOpenOutput[_, T], destinations: Seq[HasOpenInput[T, _]]) = () +final case class Merge[T, U, V >: T with U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[V] {} } -object Graph { - def apply(): Graph = new Graph {} +final case class Zip[T, U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[(T, U)] {} } -final case class Broadcast[T]() extends Input[T] with Output[T] -final case class Zip[T]() extends Input[T] with Output[T] +final case class Concat[T, U, V >: T with U]() { + val in1 = new Output[T] {} + val in2 = new Output[U] {} + val out = new Input[V] {} +} + +final case class Broadcast[T]() { + val in = new Output[T] {} + val out1 = new Input[T] {} + val out2 = new Input[T] {} +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala index c5d783dfe8..819aa5552c 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala @@ -27,7 +27,8 @@ class CombinatorSpec extends WordSpec with Matchers { } } "fold" in { - val t: OpenFlow[Int, String] = f.fold("elements:") { (soFar, element) ⇒ soFar + element } + val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } + val t: OpenInputFlow[Int, String] = f.withOutput(fo) } "drop" in { val t: OpenFlow[Int, Int] = f.drop(2) @@ -67,20 +68,27 @@ class CombinatorSpec extends WordSpec with Matchers { f.map(_.toString).prefixAndTail(10) } "groupBy" in { - val grouped: OpenOutputFlow[Int, (Int, OpenOutputFlow[Int, Int])] = - From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o % 3) + val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] = + From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString) - val closedInner: OpenOutputFlow[Int, (Int, ClosedFlow[Int, Int])] = grouped.map { - case (key, openFlow) ⇒ (key, openFlow.withOutput(FutureOut())) + val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map { + case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut())) } // both of these compile, even if `grouped` has inner flows unclosed - grouped.withOutput(FutureOut()).run - closedInner.withOutput(FutureOut()).run + grouped.withOutput(PublisherOut()).run + closedInner.withOutput(PublisherOut()).run } "splitWhen" in { val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) } } -} \ No newline at end of file + "Linear combinators which consume multiple flows" should { + "flatten" in { + val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) + val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow) + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala index e533f418c6..fe6cee34dd 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala @@ -17,7 +17,7 @@ class FlowSpec extends WordSpec with Matchers { "go through all states" in { val f: OpenFlow[Int, Int] = From[Int] .withInput(intSeq) - .withOutput(FutureOut()) + .withOutput(PublisherOut()) .withoutInput .withoutOutput } @@ -40,10 +40,10 @@ class FlowSpec extends WordSpec with Matchers { val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq) "closedInput.run" shouldNot compile - val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(FutureOut()) + val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut()) "closedOutput.run" shouldNot compile - closedInput.withOutput(FutureOut()).run + closedInput.withOutput(PublisherOut()).run closedOutput.withInput(intSeq).run } "prepend OpenFlow" in { @@ -55,15 +55,15 @@ class FlowSpec extends WordSpec with Matchers { val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq) "closedInput.run" shouldNot compile - val closedOutput: OpenInputFlow[String, String] = open3.withOutput(FutureOut()) + val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut()) "closedOutput.run" shouldNot compile - closedInput.withOutput(FutureOut()).run + closedInput.withOutput(PublisherOut()).run closedOutput.withInput(strSeq).run } "append OpenInputFlow" in { val open: OpenFlow[Int, String] = From[Int].map(_.toString) - val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(FutureOut()) + val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) val appended: OpenInputFlow[Int, Int] = open.append(closedOutput) "appended.run" shouldNot compile "appended.toFuture" shouldNot compile @@ -75,13 +75,13 @@ class FlowSpec extends WordSpec with Matchers { val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput) "prepended.run" shouldNot compile "prepended.withInput(strSeq)" shouldNot compile - prepended.withOutput(FutureOut()).run + prepended.withOutput(PublisherOut()).run } } "OpenInputFlow" should { val openInput: OpenInputFlow[Int, String] = - From[Int].map(_.toString).withOutput(FutureOut()) + From[Int].map(_.toString).withOutput(PublisherOut()) "accept Input" in { openInput.withInput(intSeq) } @@ -103,7 +103,7 @@ class FlowSpec extends WordSpec with Matchers { val openOutput: OpenOutputFlow[Int, String] = From(Seq(1, 2, 3)).map(_.toString) "accept Output" in { - openOutput.withOutput(FutureOut()) + openOutput.withOutput(PublisherOut()) } "drop Input" in { openOutput.withoutInput @@ -121,7 +121,7 @@ class FlowSpec extends WordSpec with Matchers { "ClosedFlow" should { val closed: ClosedFlow[Int, String] = - From(Seq(1, 2, 3)).map(_.toString).withOutput(FutureOut()) + From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) "run" in { closed.run } diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala index f50f25be45..46647af26f 100644 --- a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala @@ -6,44 +6,52 @@ import scala.collection.immutable.Seq class GraphSpec extends WordSpec with Matchers { - val intSeq = IterableIn(Seq(1, 2, 3)) - "Graph" should { "merge" in { - val in1 = From[Int] - val in2 = From[Int] - val out1 = From[Int] - val out2 = From[String] + val merge = Merge[Int, Int, Int]() - Graph().merge(in1, in2, out1) - "Graph().merge(in1, in2, out2)" shouldNot compile + val in1 = From[Int].withOutput(merge.in1) + val in2 = From[Int].withOutput(merge.in2) + val out1 = From[Int].withInput(merge.out) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(merge.out)" shouldNot compile } "zip" in { - val in1 = From[Int] - val in2 = From[String] - val out1 = From[(Int, String)] - val out2 = From[(String, Int)] + val zip = Zip[Int, String]() - Graph().zip(in1, in2, out1) - "Graph().zip(in1, in2, out2)" shouldNot compile + val in1 = From[Int].withOutput(zip.in1) + val in2 = From[String].withOutput(zip.in2) + val out1 = From[(Int, String)].withInput(zip.out) + + val out2 = From[(String, Int)] + // FIXME: make me not compile + //"out2.withInput(zip.out)" shouldNot compile } "concat" in { - val in1 = From[Int] - val in2 = From[Int] - val out1 = From[Int] - val out2 = From[String] + trait A + trait B extends A - Graph().concat(in1, in2, out1) - "Graph().concat(in1, in2, out2)" shouldNot compile + val concat = Concat[A, B, A]() + val in1 = From[A].withOutput(concat.in1) + val in2 = From[B].withOutput(concat.in2) + val out1 = From[A].withInput(concat.out) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(concat.out)" shouldNot compile } "broadcast" in { - val in1 = From[Int].map(_ * 2) - val in2 = From[Int].map(_.toString) - val out1 = From[Int].map(_.toString) - val out2 = From[Int].filter(_ % 2 == 0) + val broadcast = Broadcast[Int]() - Graph().broadcast(in1, Seq(out1, out2)) - "Graph().broadcast(in2, Seq(out1, out2))" shouldNot compile + val in1 = From[Int].withOutput(broadcast.in) + val in2 = From[Int].withInput(broadcast.out1) + val out1 = From[Int].withInput(broadcast.out2) + + val out2 = From[String] + // FIXME: make me not compile + //"out2.withInput(broadcast.out2)" shouldNot compile } } }