!str #15672 add HTTP Pipeline with the new DSL
* implement graph operations as case class junctions * add materialization method to ClosedFlow
This commit is contained in:
parent
6d165d28f9
commit
59e9f71629
6 changed files with 279 additions and 67 deletions
|
|
@ -0,0 +1,114 @@
|
|||
package akka.http.server
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.http.Http
|
||||
import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse }
|
||||
import akka.http.parsing.HttpRequestParser
|
||||
import akka.http.parsing.ParserOutput._
|
||||
import akka.http.rendering.ResponseRenderingContext
|
||||
import akka.stream.dsl.{ FlowMaterializer ⇒ NewFM, _ }
|
||||
import akka.stream.io.StreamTcp
|
||||
import akka.stream.{ FlowMaterializer, Transformer }
|
||||
import akka.util.ByteString
|
||||
|
||||
class NewDslHttpServerPipeline(settings: ServerSettings,
|
||||
materializer: FlowMaterializer,
|
||||
log: LoggingAdapter) {
|
||||
|
||||
import akka.http.server.NewDslHttpServerPipeline._
|
||||
|
||||
val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)()
|
||||
val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒
|
||||
if (settings.parserSettings.illegalHeaderWarnings)
|
||||
log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
|
||||
|
||||
val responseRendererFactory = new {
|
||||
def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow graph:
|
||||
*
|
||||
* tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest]
|
||||
* |
|
||||
* \-> applicationBypassFlow -\
|
||||
* |
|
||||
* Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream
|
||||
*/
|
||||
def apply(tcpConn: StreamTcp.IncomingTcpConnection) = {
|
||||
|
||||
val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]()
|
||||
val merge = Merge[MessageStart, HttpResponse, Any]()
|
||||
|
||||
val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] =
|
||||
From(tcpConn.inputStream)
|
||||
.transform(rootParser.copyWith(warnOnIllegalHeader))
|
||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd)
|
||||
.headAndTail()
|
||||
.withOutput(broadcast.in)
|
||||
|
||||
val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] =
|
||||
From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]
|
||||
.withInput(broadcast.out1)
|
||||
.collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x }
|
||||
.withOutput(merge.in1)
|
||||
|
||||
val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] =
|
||||
From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]
|
||||
.withInput(broadcast.out2)
|
||||
.collect {
|
||||
case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒
|
||||
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
|
||||
val pub = entityParts.withOutput(PublisherOut()).as(NewFM.publisher[RequestOutput])
|
||||
HttpRequest(method, effectiveUri, headers, createEntity(pub), protocol)
|
||||
}
|
||||
.withOutput(PublisherOut())
|
||||
|
||||
val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] =
|
||||
From[HttpResponse]
|
||||
.withInput(SubscriberIn())
|
||||
.withOutput(merge.in2)
|
||||
|
||||
val responseFlowAfterMerge: ClosedFlow[Any, ByteString] =
|
||||
From[Any]
|
||||
.withInput(merge.out)
|
||||
.transform(applyApplicationBypass)
|
||||
.transform(responseRendererFactory.newRenderer)
|
||||
.flatten(FlattenStrategy.concatOpenOutputFlow)
|
||||
.transform(errorLogger(log, "Outgoing response stream error"))
|
||||
.withOutput(SubscriberOut(tcpConn.outputStream))
|
||||
|
||||
val requestPublisher = requestFlowAfterBroadcast.as(NewFM.publisher[HttpRequest])
|
||||
val responseSubscriber = responseFlowBeforeMerge.as(NewFM.subscriber[HttpResponse])
|
||||
|
||||
Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber)
|
||||
}
|
||||
|
||||
def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ???
|
||||
|
||||
private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ???
|
||||
}
|
||||
|
||||
object NewDslHttpServerPipeline {
|
||||
|
||||
/**
|
||||
* FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow`
|
||||
* or `OpenOutputFlow` to `HasOpenOutput`.
|
||||
*
|
||||
* Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`.
|
||||
*/
|
||||
implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal {
|
||||
def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = {
|
||||
val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] =
|
||||
From[OpenOutputFlow[InnerIn, InnerOut]]
|
||||
.map { f ⇒
|
||||
f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) }
|
||||
}
|
||||
|
||||
val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] =
|
||||
flow.flatten(FlattenStrategy.concatOpenOutputFlow)
|
||||
|
||||
underlying.append(flattened)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +1,12 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
import akka.stream.impl.Ast
|
||||
import org.reactivestreams.{ Subscriber, Publisher }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.FlattenStrategy
|
||||
import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy }
|
||||
|
||||
sealed trait Flow[-In, +Out] {
|
||||
val transform: Transform[In, Out]
|
||||
|
|
@ -28,10 +30,26 @@ object From {
|
|||
* Example usage: Flow(Future { 1 })
|
||||
*/
|
||||
def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f))
|
||||
|
||||
/**
|
||||
* Helper to create Flow with Input from Publisher.
|
||||
*/
|
||||
def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p))
|
||||
}
|
||||
|
||||
trait Input[-In]
|
||||
|
||||
/**
|
||||
* Default input.
|
||||
* Allows to materialize a Flow with this input to Subscriber.
|
||||
*/
|
||||
final case class SubscriberIn[-In]() extends Input[In]
|
||||
|
||||
/**
|
||||
* Input from Publisher.
|
||||
*/
|
||||
final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In]
|
||||
|
||||
/**
|
||||
* Input from Iterable
|
||||
*
|
||||
|
|
@ -48,11 +66,28 @@ final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[I
|
|||
*/
|
||||
final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In]
|
||||
|
||||
trait Output[+Out]
|
||||
trait Output[+Out] {
|
||||
def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T]
|
||||
}
|
||||
|
||||
final case class FutureOut[+Out]() extends Output[Out]
|
||||
/**
|
||||
* Default output. Does no reducing operations.
|
||||
* Allows to materialize a Flow with this output to Publisher.
|
||||
*/
|
||||
final case class PublisherOut[+Out]() extends Output[Out]
|
||||
|
||||
/**
|
||||
* Output to a Subscriber.
|
||||
*/
|
||||
final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out]
|
||||
|
||||
/**
|
||||
* Fold output. Reduces output stream according to the given fold function.
|
||||
*/
|
||||
final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] {
|
||||
override def transform[T, O >: Out]: Transform[O, T] = EmptyTransform[O, T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Operations with a Flow which has open (no attached) Input.
|
||||
*
|
||||
|
|
@ -95,8 +130,6 @@ trait HasOpenOutput[-In, +Out] {
|
|||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def drop(n: Int): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def dropWithin(d: FiniteDuration): Repr[In, Out] =
|
||||
|
|
@ -111,6 +144,8 @@ trait HasOpenOutput[-In, +Out] {
|
|||
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
|
||||
def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def transform[T](transformer: Transformer[Out, T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] =
|
||||
appendTransform(EmptyTransform[Out, S]())
|
||||
def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] =
|
||||
|
|
@ -127,8 +162,8 @@ trait HasOpenOutput[-In, +Out] {
|
|||
appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]())
|
||||
|
||||
// linear combinators which consume multiple flows
|
||||
def flatten[O >: Out](strategy: FlattenStrategy[Out, O]): Repr[In, O] =
|
||||
appendTransform(EmptyTransform[Out, O]())
|
||||
def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
|
||||
// linear combinators with flows
|
||||
def append[T](f: OpenFlow[Out, T]): Repr[In, T] =
|
||||
|
|
@ -142,7 +177,7 @@ final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow
|
|||
type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out]
|
||||
type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out]
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform)
|
||||
def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform ++ out.transform)
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform)
|
||||
|
||||
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform)
|
||||
|
|
@ -164,7 +199,7 @@ final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transfor
|
|||
override type Repr[-In, +Out] = OpenOutputFlow[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out]
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform)
|
||||
def withOutput[O](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform ++ out.transform)
|
||||
def withoutInput: OpenFlow[In, Out] = OpenFlow(transform)
|
||||
|
||||
protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t)
|
||||
|
|
@ -175,9 +210,50 @@ final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], tr
|
|||
def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform)
|
||||
|
||||
def run(): Unit = ()
|
||||
|
||||
def as[O >: Out, R[_]](implicit materializer: FlowMaterializer.M[O, R[O]]): R[O] = ???
|
||||
}
|
||||
|
||||
trait Transform[-In, +Out] {
|
||||
def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]()
|
||||
}
|
||||
final case class EmptyTransform[-In, +Out]() extends Transform[In, Out]
|
||||
|
||||
object FlattenStrategy {
|
||||
def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]()
|
||||
def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]()
|
||||
|
||||
final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out]
|
||||
final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out]
|
||||
}
|
||||
|
||||
/**
|
||||
* At the end we need to materialize the stream. It could be done by setting an output
|
||||
* with a transformation (e.g Out -> Publisher[Out]) which would do the materialization.
|
||||
*
|
||||
* Or we could have an additional step on ClosedFlow like the one Viktor suggested.
|
||||
* https://github.com/akka/akka/issues/15633#issuecomment-52307292
|
||||
*/
|
||||
object FlowMaterializer {
|
||||
trait M[T, R] {
|
||||
def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): R
|
||||
}
|
||||
|
||||
def publisher[T]: FlowMaterializer.M[T, Publisher[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Publisher[T]]]
|
||||
private[this] final val publisizer = mkPublisher[Any]
|
||||
private[this] def mkPublisher[T] = new FlowMaterializer.M[T, Publisher[T]] {
|
||||
def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[T] = ???
|
||||
}
|
||||
|
||||
def subscriber[T]: FlowMaterializer.M[T, Subscriber[T]] = publisizer.asInstanceOf[FlowMaterializer.M[T, Subscriber[T]]]
|
||||
private[this] final val sbscrisizer = mkSubscriber[Any]
|
||||
private[this] def mkSubscriber[T] = new FlowMaterializer.M[T, Subscriber[T]] {
|
||||
def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Subscriber[T] = ???
|
||||
}
|
||||
|
||||
def future[T]: FlowMaterializer.M[T, Future[T]] = futurizer.asInstanceOf[M[T, Future[T]]]
|
||||
private[this] final val futurizer = mkFuture[Any]
|
||||
private[this] def mkFuture[T] = new M[T, Future[T]] {
|
||||
def apply[I](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Future[T] = ???
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +1,25 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
/**
|
||||
* Dummy implementation needed for runtime tests.
|
||||
*/
|
||||
trait Graph {
|
||||
|
||||
def merge[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = ()
|
||||
def zip[T, U](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, U], destination: HasOpenInput[(T, U), _]) = ()
|
||||
def concat[T](source1: HasOpenOutput[_, T], source2: HasOpenOutput[_, T], destination: HasOpenInput[T, _]) = ()
|
||||
def broadcast[T](source: HasOpenOutput[_, T], destinations: Seq[HasOpenInput[T, _]]) = ()
|
||||
final case class Merge[T, U, V >: T with U]() {
|
||||
val in1 = new Output[T] {}
|
||||
val in2 = new Output[U] {}
|
||||
val out = new Input[V] {}
|
||||
}
|
||||
|
||||
object Graph {
|
||||
def apply(): Graph = new Graph {}
|
||||
final case class Zip[T, U]() {
|
||||
val in1 = new Output[T] {}
|
||||
val in2 = new Output[U] {}
|
||||
val out = new Input[(T, U)] {}
|
||||
}
|
||||
|
||||
final case class Broadcast[T]() extends Input[T] with Output[T]
|
||||
final case class Zip[T]() extends Input[T] with Output[T]
|
||||
final case class Concat[T, U, V >: T with U]() {
|
||||
val in1 = new Output[T] {}
|
||||
val in2 = new Output[U] {}
|
||||
val out = new Input[V] {}
|
||||
}
|
||||
|
||||
final case class Broadcast[T]() {
|
||||
val in = new Output[T] {}
|
||||
val out1 = new Input[T] {}
|
||||
val out2 = new Input[T] {}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,8 @@ class CombinatorSpec extends WordSpec with Matchers {
|
|||
}
|
||||
}
|
||||
"fold" in {
|
||||
val t: OpenFlow[Int, String] = f.fold("elements:") { (soFar, element) ⇒ soFar + element }
|
||||
val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element }
|
||||
val t: OpenInputFlow[Int, String] = f.withOutput(fo)
|
||||
}
|
||||
"drop" in {
|
||||
val t: OpenFlow[Int, Int] = f.drop(2)
|
||||
|
|
@ -67,20 +68,27 @@ class CombinatorSpec extends WordSpec with Matchers {
|
|||
f.map(_.toString).prefixAndTail(10)
|
||||
}
|
||||
"groupBy" in {
|
||||
val grouped: OpenOutputFlow[Int, (Int, OpenOutputFlow[Int, Int])] =
|
||||
From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o % 3)
|
||||
val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] =
|
||||
From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString)
|
||||
|
||||
val closedInner: OpenOutputFlow[Int, (Int, ClosedFlow[Int, Int])] = grouped.map {
|
||||
case (key, openFlow) ⇒ (key, openFlow.withOutput(FutureOut()))
|
||||
val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map {
|
||||
case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut()))
|
||||
}
|
||||
|
||||
// both of these compile, even if `grouped` has inner flows unclosed
|
||||
grouped.withOutput(FutureOut()).run
|
||||
closedInner.withOutput(FutureOut()).run
|
||||
grouped.withOutput(PublisherOut()).run
|
||||
closedInner.withOutput(PublisherOut()).run
|
||||
}
|
||||
"splitWhen" in {
|
||||
val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
"Linear combinators which consume multiple flows" should {
|
||||
"flatten" in {
|
||||
val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2)
|
||||
val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ class FlowSpec extends WordSpec with Matchers {
|
|||
"go through all states" in {
|
||||
val f: OpenFlow[Int, Int] = From[Int]
|
||||
.withInput(intSeq)
|
||||
.withOutput(FutureOut())
|
||||
.withOutput(PublisherOut())
|
||||
.withoutInput
|
||||
.withoutOutput
|
||||
}
|
||||
|
|
@ -40,10 +40,10 @@ class FlowSpec extends WordSpec with Matchers {
|
|||
val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq)
|
||||
"closedInput.run" shouldNot compile
|
||||
|
||||
val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(FutureOut())
|
||||
val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut())
|
||||
"closedOutput.run" shouldNot compile
|
||||
|
||||
closedInput.withOutput(FutureOut()).run
|
||||
closedInput.withOutput(PublisherOut()).run
|
||||
closedOutput.withInput(intSeq).run
|
||||
}
|
||||
"prepend OpenFlow" in {
|
||||
|
|
@ -55,15 +55,15 @@ class FlowSpec extends WordSpec with Matchers {
|
|||
val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq)
|
||||
"closedInput.run" shouldNot compile
|
||||
|
||||
val closedOutput: OpenInputFlow[String, String] = open3.withOutput(FutureOut())
|
||||
val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut())
|
||||
"closedOutput.run" shouldNot compile
|
||||
|
||||
closedInput.withOutput(FutureOut()).run
|
||||
closedInput.withOutput(PublisherOut()).run
|
||||
closedOutput.withInput(strSeq).run
|
||||
}
|
||||
"append OpenInputFlow" in {
|
||||
val open: OpenFlow[Int, String] = From[Int].map(_.toString)
|
||||
val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(FutureOut())
|
||||
val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut())
|
||||
val appended: OpenInputFlow[Int, Int] = open.append(closedOutput)
|
||||
"appended.run" shouldNot compile
|
||||
"appended.toFuture" shouldNot compile
|
||||
|
|
@ -75,13 +75,13 @@ class FlowSpec extends WordSpec with Matchers {
|
|||
val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput)
|
||||
"prepended.run" shouldNot compile
|
||||
"prepended.withInput(strSeq)" shouldNot compile
|
||||
prepended.withOutput(FutureOut()).run
|
||||
prepended.withOutput(PublisherOut()).run
|
||||
}
|
||||
}
|
||||
|
||||
"OpenInputFlow" should {
|
||||
val openInput: OpenInputFlow[Int, String] =
|
||||
From[Int].map(_.toString).withOutput(FutureOut())
|
||||
From[Int].map(_.toString).withOutput(PublisherOut())
|
||||
"accept Input" in {
|
||||
openInput.withInput(intSeq)
|
||||
}
|
||||
|
|
@ -103,7 +103,7 @@ class FlowSpec extends WordSpec with Matchers {
|
|||
val openOutput: OpenOutputFlow[Int, String] =
|
||||
From(Seq(1, 2, 3)).map(_.toString)
|
||||
"accept Output" in {
|
||||
openOutput.withOutput(FutureOut())
|
||||
openOutput.withOutput(PublisherOut())
|
||||
}
|
||||
"drop Input" in {
|
||||
openOutput.withoutInput
|
||||
|
|
@ -121,7 +121,7 @@ class FlowSpec extends WordSpec with Matchers {
|
|||
|
||||
"ClosedFlow" should {
|
||||
val closed: ClosedFlow[Int, String] =
|
||||
From(Seq(1, 2, 3)).map(_.toString).withOutput(FutureOut())
|
||||
From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut())
|
||||
"run" in {
|
||||
closed.run
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,44 +6,52 @@ import scala.collection.immutable.Seq
|
|||
|
||||
class GraphSpec extends WordSpec with Matchers {
|
||||
|
||||
val intSeq = IterableIn(Seq(1, 2, 3))
|
||||
|
||||
"Graph" should {
|
||||
"merge" in {
|
||||
val in1 = From[Int]
|
||||
val in2 = From[Int]
|
||||
val out1 = From[Int]
|
||||
val out2 = From[String]
|
||||
val merge = Merge[Int, Int, Int]()
|
||||
|
||||
Graph().merge(in1, in2, out1)
|
||||
"Graph().merge(in1, in2, out2)" shouldNot compile
|
||||
val in1 = From[Int].withOutput(merge.in1)
|
||||
val in2 = From[Int].withOutput(merge.in2)
|
||||
val out1 = From[Int].withInput(merge.out)
|
||||
|
||||
val out2 = From[String]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(merge.out)" shouldNot compile
|
||||
}
|
||||
"zip" in {
|
||||
val in1 = From[Int]
|
||||
val in2 = From[String]
|
||||
val out1 = From[(Int, String)]
|
||||
val out2 = From[(String, Int)]
|
||||
val zip = Zip[Int, String]()
|
||||
|
||||
Graph().zip(in1, in2, out1)
|
||||
"Graph().zip(in1, in2, out2)" shouldNot compile
|
||||
val in1 = From[Int].withOutput(zip.in1)
|
||||
val in2 = From[String].withOutput(zip.in2)
|
||||
val out1 = From[(Int, String)].withInput(zip.out)
|
||||
|
||||
val out2 = From[(String, Int)]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(zip.out)" shouldNot compile
|
||||
}
|
||||
"concat" in {
|
||||
val in1 = From[Int]
|
||||
val in2 = From[Int]
|
||||
val out1 = From[Int]
|
||||
val out2 = From[String]
|
||||
trait A
|
||||
trait B extends A
|
||||
|
||||
Graph().concat(in1, in2, out1)
|
||||
"Graph().concat(in1, in2, out2)" shouldNot compile
|
||||
val concat = Concat[A, B, A]()
|
||||
val in1 = From[A].withOutput(concat.in1)
|
||||
val in2 = From[B].withOutput(concat.in2)
|
||||
val out1 = From[A].withInput(concat.out)
|
||||
|
||||
val out2 = From[String]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(concat.out)" shouldNot compile
|
||||
}
|
||||
"broadcast" in {
|
||||
val in1 = From[Int].map(_ * 2)
|
||||
val in2 = From[Int].map(_.toString)
|
||||
val out1 = From[Int].map(_.toString)
|
||||
val out2 = From[Int].filter(_ % 2 == 0)
|
||||
val broadcast = Broadcast[Int]()
|
||||
|
||||
Graph().broadcast(in1, Seq(out1, out2))
|
||||
"Graph().broadcast(in2, Seq(out1, out2))" shouldNot compile
|
||||
val in1 = From[Int].withOutput(broadcast.in)
|
||||
val in2 = From[Int].withInput(broadcast.out1)
|
||||
val out1 = From[Int].withInput(broadcast.out2)
|
||||
|
||||
val out2 = From[String]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(broadcast.out2)" shouldNot compile
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue