Merge pull request #15698 from 2m/wip-typesafe-dsl

WIP Steams typesafe DSL
This commit is contained in:
Martynas Mickevičius 2014-08-26 16:10:00 +02:00
commit fbbe5cd451
6 changed files with 659 additions and 0 deletions

View file

@ -0,0 +1,114 @@
package akka.http.server
import akka.event.LoggingAdapter
import akka.http.Http
import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse }
import akka.http.parsing.HttpRequestParser
import akka.http.parsing.ParserOutput._
import akka.http.rendering.ResponseRenderingContext
import akka.stream.dsl._
import akka.stream.io.StreamTcp
import akka.stream.{ FlowMaterializer, Transformer }
import akka.util.ByteString
class NewDslHttpServerPipeline(settings: ServerSettings,
materializer: FlowMaterializer,
log: LoggingAdapter) {
import akka.http.server.NewDslHttpServerPipeline._
val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)()
val warnOnIllegalHeader: ErrorInfo Unit = errorInfo
if (settings.parserSettings.illegalHeaderWarnings)
log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
val responseRendererFactory = new {
def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ???
}
/**
* Flow graph:
*
* tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest]
* |
* \-> applicationBypassFlow -\
* |
* Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream
*/
def apply(tcpConn: StreamTcp.IncomingTcpConnection) = {
val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]()
val merge = Merge[MessageStart, HttpResponse, Any]()
val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] =
From(tcpConn.inputStream)
.transform(rootParser.copyWith(warnOnIllegalHeader))
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail()
.withOutput(broadcast.in)
val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] =
From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]
.withInput(broadcast.out1)
.collect[MessageStart with RequestOutput] { case (x: MessageStart, _) x }
.withOutput(merge.in1)
val requestPublisher = PublisherOut[HttpRequest]()
val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] =
From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]
.withInput(broadcast.out2)
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val publisher = PublisherOut[RequestOutput]()
val flow = entityParts.withOutput(publisher)
HttpRequest(method, effectiveUri, headers, createEntity(publisher.publisher), protocol)
}
.withOutput(requestPublisher)
val responseSubscriber = SubscriberIn[HttpResponse]()
val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] =
From[HttpResponse]
.withInput(responseSubscriber)
.withOutput(merge.in2)
val responseFlowAfterMerge: ClosedFlow[Any, ByteString] =
From[Any]
.withInput(merge.out)
.transform(applyApplicationBypass)
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concatOpenOutputFlow)
.transform(errorLogger(log, "Outgoing response stream error"))
.withOutput(SubscriberOut(tcpConn.outputStream))
Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher.publisher, responseSubscriber.subscriber)
}
def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ???
private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ???
}
object NewDslHttpServerPipeline {
/**
* FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow`
* or `OpenOutputFlow` to `HasOpenOutput`.
*
* Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`.
*/
implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal {
def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = {
val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] =
From[OpenOutputFlow[InnerIn, InnerOut]]
.map { f
f.prefixAndTail(1).map { case (prefix, tail) (prefix.head, tail) }
}
val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] =
flow.flatten(FlattenStrategy.concatOpenOutputFlow)
underlying.append(flattened)
}
}
}

View file

@ -0,0 +1,228 @@
package akka.stream.dsl
import akka.stream.impl.Ast
import org.reactivestreams.{ Subscriber, Publisher }
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy }
sealed trait Flow[-In, +Out] {
val transform: Transform[In, Out]
}
object From {
/**
* Helper to create Flow without Input.
* Example usage: From[Int]
*/
def apply[T]: OpenFlow[T, T] = OpenFlow[T, T](EmptyTransform[T, T]())
/**
* Helper to create Flow with Input from Iterable.
* Example usage: Flow(Seq(1,2,3))
*/
def apply[T](i: immutable.Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i))
/**
* Helper to create Flow with Input from Future.
* Example usage: Flow(Future { 1 })
*/
def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f))
/**
* Helper to create Flow with Input from Publisher.
*/
def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p))
}
trait Input[-In]
/**
* Default input.
* Allows to materialize a Flow with this input to Subscriber.
*/
final case class SubscriberIn[-In]() extends Input[In] {
def subscriber[I <: In]: Subscriber[I] = ???
}
/**
* Input from Publisher.
*/
final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In]
/**
* Input from Iterable
*
* Changing In from Contravariant to Covariant is needed because Iterable[+A].
* But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any]
*/
final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In]
/**
* Input from Future
*
* Changing In from Contravariant to Covariant is needed because Future[+A].
* But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any]
*/
final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In]
trait Output[+Out]
/**
* Default output.
* Allows to materialize a Flow with this output to Publisher.
*/
final case class PublisherOut[+Out]() extends Output[Out] {
def publisher[O >: Out]: Publisher[O] = ???
}
/**
* Output to a Subscriber.
*/
final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out]
/**
* Fold output. Reduces output stream according to the given fold function.
*/
final case class FoldOut[T, +Out](zero: T)(f: (T, Out) T) extends Output[Out] {
def future: Future[T] = ???
}
/**
* Operations with a Flow which has open (no attached) Input.
*
* No Out type parameter would be useful for Graph signatures, but we need it here
* for `withInput` and `prependTransform` methods.
*/
sealed trait HasOpenInput[-In, +Out] {
type Repr[-In, +Out] <: HasOpenInput[In, Out]
type AfterCloseInput[-In, +Out] <: Flow[In, Out]
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out]
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out]
// linear combinators with flows
def prepend[T](f: OpenFlow[T, In]): Repr[T, Out] =
prependTransform(f.transform)
def prepend[T](f: OpenOutputFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] =
prependTransform(f.transform).withInput(f.input)
}
/**
* Operations with a Flow which has open (no attached) Output.
*
* No In type parameter would be useful for Graph signatures, but we need it here
* for `withOutput` and `appendTransform` methods.
*/
trait HasOpenOutput[-In, +Out] {
type Repr[-In, +Out] <: HasOpenOutput[In, Out]
type AfterCloseOutput[-In, +Out] <: Flow[In, Out]
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O]
protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T]
// linear simple combinators
def map[T](f: Out T): Repr[In, T] =
appendTransform(EmptyTransform[Out, T]())
def mapFuture[T](f: Out Future[T]): Repr[In, T] =
appendTransform(EmptyTransform[Out, T]())
def filter(p: Out Boolean): Repr[In, Out] =
appendTransform(EmptyTransform[Out, Out]())
def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] =
appendTransform(EmptyTransform[Out, T]())
def drop(n: Int): Repr[In, Out] =
appendTransform(EmptyTransform[Out, Out]())
def dropWithin(d: FiniteDuration): Repr[In, Out] =
appendTransform(EmptyTransform[Out, Out]())
def take(n: Int): Repr[In, Out] =
appendTransform(EmptyTransform[Out, Out]())
def takeWithin(d: FiniteDuration): Repr[In, Out] =
appendTransform(EmptyTransform[Out, Out]())
def grouped(n: Int): Repr[In, immutable.Seq[Out]] =
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] =
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
def mapConcat[T](f: Out immutable.Seq[T]): Repr[In, T] =
appendTransform(EmptyTransform[Out, T]())
def transform[T](transformer: Transformer[Out, T]): Repr[In, T] =
appendTransform(EmptyTransform[Out, T]())
def conflate[S](seed: Out S, aggregate: (S, Out) S): Repr[In, S] =
appendTransform(EmptyTransform[Out, S]())
def expand[S, O](seed: Out S, extrapolate: S (O, S)): Repr[In, O] =
appendTransform(EmptyTransform[Out, O]())
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] =
appendTransform(EmptyTransform[Out, Out]())
// linear combinators which produce multiple flows
def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], OpenOutputFlow[O, O])] =
appendTransform(EmptyTransform[Out, (immutable.Seq[O], OpenOutputFlow[O, O])]())
def groupBy[O >: Out, K](f: O K): Repr[In, (K, OpenOutputFlow[O, O])] =
appendTransform(EmptyTransform[Out, (K, OpenOutputFlow[O, O])]())
def splitWhen[O >: Out](p: Out Boolean): Repr[In, OpenOutputFlow[O, O]] =
appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]())
// linear combinators which consume multiple flows
def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] =
appendTransform(EmptyTransform[Out, T]())
// linear combinators with flows
def append[T](f: OpenFlow[Out, T]): Repr[In, T] =
appendTransform(f.transform)
def append[T](f: OpenInputFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] =
appendTransform(f.transform).withOutput(f.output)
}
final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] {
override type Repr[-In, +Out] = OpenFlow[In, Out]
type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out]
type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out]
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform)
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform)
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform)
protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = OpenFlow(transform ++ t)
}
final case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] {
type Repr[-In, +Out] = OpenInputFlow[In, Out]
type AfterCloseInput[-In, +Out] = ClosedFlow[In, Out]
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = ClosedFlow(in, output, transform)
def withoutOutput: OpenFlow[In, Out] = OpenFlow(transform)
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] =
OpenInputFlow(output, t ++ transform)
}
final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] {
override type Repr[-In, +Out] = OpenOutputFlow[In, Out]
type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out]
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform)
def withoutInput: OpenFlow[In, Out] = OpenFlow(transform)
protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t)
}
final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] {
def withoutOutput: OpenOutputFlow[In, Out] = OpenOutputFlow(input, transform)
def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform)
def run(): Unit = ()
}
trait Transform[-In, +Out] {
def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]()
}
final case class EmptyTransform[-In, +Out]() extends Transform[In, Out]
object FlattenStrategy {
def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]()
def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]()
final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out]
final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out]
}

View file

@ -0,0 +1,25 @@
package akka.stream.dsl
final case class Merge[T, U, V >: T with U]() {
val in1 = new Output[T] {}
val in2 = new Output[U] {}
val out = new Input[V] {}
}
final case class Zip[T, U]() {
val in1 = new Output[T] {}
val in2 = new Output[U] {}
val out = new Input[(T, U)] {}
}
final case class Concat[T, U, V >: T with U]() {
val in1 = new Output[T] {}
val in2 = new Output[U] {}
val out = new Input[V] {}
}
final case class Broadcast[T]() {
val in = new Output[T] {}
val out1 = new Input[T] {}
val out2 = new Input[T] {}
}

View file

@ -0,0 +1,94 @@
package akka.stream.dsl
import org.scalatest.Matchers
import org.scalatest.WordSpec
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import akka.stream.OverflowStrategy
class CombinatorSpec extends WordSpec with Matchers {
val f = From[Int]
"Linear simple combinators in Flow" should {
"map" in {
val t: OpenFlow[Int, Int] = f.map(_ * 2)
}
"mapFuture" in {
import scala.concurrent.ExecutionContext.Implicits.global
val t: OpenFlow[Int, Int] = f.mapFuture(Future(_))
}
"filter" in {
val t: OpenFlow[Int, Int] = f.filter(_ != 2)
}
"collect" in {
val t: OpenFlow[Int, String] = f.collect {
case i: Int if i == 2 "two"
}
}
"fold" in {
val fo = FoldOut("elements:") { (soFar, element: Int) soFar + element }
val t: OpenInputFlow[Int, Int] = f.withOutput(fo)
}
"drop" in {
val t: OpenFlow[Int, Int] = f.drop(2)
}
"dropWithin" in {
val t: OpenFlow[Int, Int] = f.dropWithin(2.seconds)
}
"take" in {
val t: OpenFlow[Int, Int] = f.take(2)
}
"takeWithin" in {
val t: OpenFlow[Int, Int] = f.takeWithin(2.seconds)
}
"grouped" in {
val t: OpenFlow[Int, immutable.Seq[Int]] = f.grouped(2)
}
"groupedWithin" in {
val t: OpenFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds)
}
"mapConcat" in {
val t: OpenFlow[Int, Int] = f.mapConcat { i immutable.Seq(i, i, i) }
}
"conflate" in {
val t: OpenFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) soFar + i)
}
"expand" in {
val t: OpenFlow[Int, String] = f.expand(_.toString, (soFar: String) (soFar, "_"))
}
"buffer" in {
val t: OpenFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead)
}
}
"Linear combinators which produce multiple flows" should {
"prefixAndTail" in {
val t: OpenFlow[Int, (immutable.Seq[String], OpenOutputFlow[String, String])] =
f.map(_.toString).prefixAndTail(10)
}
"groupBy" in {
val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] =
From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) o.toString)
val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map {
case (key, openFlow) (key, openFlow.withOutput(PublisherOut()))
}
// both of these compile, even if `grouped` has inner flows unclosed
grouped.withOutput(PublisherOut()).run
closedInner.withOutput(PublisherOut()).run
}
"splitWhen" in {
val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2)
}
}
"Linear combinators which consume multiple flows" should {
"flatten" in {
val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2)
val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow)
}
}
}

View file

@ -0,0 +1,141 @@
package akka.stream.dsl
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.immutable.Seq
import scala.concurrent.Future
class FlowSpec extends WordSpec with Matchers {
val intSeq = IterableIn(Seq(1, 2, 3))
val strSeq = IterableIn(Seq("a", "b", "c"))
import scala.concurrent.ExecutionContext.Implicits.global
val intFut = FutureIn(Future { 3 })
"OpenFlow" should {
"go through all states" in {
val f: OpenFlow[Int, Int] = From[Int]
.withInput(intSeq)
.withOutput(PublisherOut())
.withoutInput
.withoutOutput
}
"should not run" in {
val open: OpenFlow[Int, Int] = From[Int]
"open.run" shouldNot compile
}
"accept IterableIn" in {
val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intSeq)
}
"accept FutureIn" in {
val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intFut)
}
"append OpenFlow" in {
val open1: OpenFlow[Int, String] = From[Int].map(_.toString)
val open2: OpenFlow[String, Int] = From[String].map(_.hashCode)
val open3: OpenFlow[Int, Int] = open1.append(open2)
"open3.run" shouldNot compile
val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq)
"closedInput.run" shouldNot compile
val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut())
"closedOutput.run" shouldNot compile
closedInput.withOutput(PublisherOut()).run
closedOutput.withInput(intSeq).run
}
"prepend OpenFlow" in {
val open1: OpenFlow[Int, String] = From[Int].map(_.toString)
val open2: OpenFlow[String, Int] = From[String].map(_.hashCode)
val open3: OpenFlow[String, String] = open1.prepend(open2)
"open3.run" shouldNot compile
val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq)
"closedInput.run" shouldNot compile
val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut())
"closedOutput.run" shouldNot compile
closedInput.withOutput(PublisherOut()).run
closedOutput.withInput(strSeq).run
}
"append OpenInputFlow" in {
val open: OpenFlow[Int, String] = From[Int].map(_.toString)
val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut())
val appended: OpenInputFlow[Int, Int] = open.append(closedOutput)
"appended.run" shouldNot compile
"appended.toFuture" shouldNot compile
appended.withInput(intSeq).run
}
"prepend OpenOutputFlow" in {
val open: OpenFlow[Int, String] = From[Int].map(_.toString)
val closedInput: OpenOutputFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq)
val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput)
"prepended.run" shouldNot compile
"prepended.withInput(strSeq)" shouldNot compile
prepended.withOutput(PublisherOut()).run
}
}
"OpenInputFlow" should {
val openInput: OpenInputFlow[Int, String] =
From[Int].map(_.toString).withOutput(PublisherOut())
"accept Input" in {
openInput.withInput(intSeq)
}
"drop Output" in {
openInput.withoutOutput
}
"not drop Input" in {
"openInput.withoutInput" shouldNot compile
}
"not accept Output" in {
"openInput.ToFuture" shouldNot compile
}
"not run" in {
"openInput.run" shouldNot compile
}
}
"OpenOutputFlow" should {
val openOutput: OpenOutputFlow[Int, String] =
From(Seq(1, 2, 3)).map(_.toString)
"accept Output" in {
openOutput.withOutput(PublisherOut())
}
"drop Input" in {
openOutput.withoutInput
}
"not drop Output" in {
"openOutput.withoutOutput" shouldNot compile
}
"not accept Input" in {
"openOutput.withInput(intSeq)" shouldNot compile
}
"not run" in {
"openOutput.run" shouldNot compile
}
}
"ClosedFlow" should {
val closed: ClosedFlow[Int, String] =
From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut())
"run" in {
closed.run
}
"drop Input" in {
closed.withoutInput
}
"drop Output" in {
closed.withoutOutput
}
"not accept Input" in {
"closed.withInput(intSeq)" shouldNot compile
}
"not accept Output" in {
"closed.ToFuture" shouldNot compile
}
}
}

View file

@ -0,0 +1,57 @@
package akka.stream.dsl
import org.scalatest.{ WordSpec, Matchers }
import scala.collection.immutable.Seq
class GraphSpec extends WordSpec with Matchers {
"Graph" should {
"merge" in {
val merge = Merge[Int, Int, Int]()
val in1 = From[Int].withOutput(merge.in1)
val in2 = From[Int].withOutput(merge.in2)
val out1 = From[Int].withInput(merge.out)
val out2 = From[String]
// FIXME: make me not compile
//"out2.withInput(merge.out)" shouldNot compile
}
"zip" in {
val zip = Zip[Int, String]()
val in1 = From[Int].withOutput(zip.in1)
val in2 = From[String].withOutput(zip.in2)
val out1 = From[(Int, String)].withInput(zip.out)
val out2 = From[(String, Int)]
// FIXME: make me not compile
//"out2.withInput(zip.out)" shouldNot compile
}
"concat" in {
trait A
trait B extends A
val concat = Concat[A, B, A]()
val in1 = From[A].withOutput(concat.in1)
val in2 = From[B].withOutput(concat.in2)
val out1 = From[A].withInput(concat.out)
val out2 = From[String]
// FIXME: make me not compile
//"out2.withInput(concat.out)" shouldNot compile
}
"broadcast" in {
val broadcast = Broadcast[Int]()
val in1 = From[Int].withOutput(broadcast.in)
val in2 = From[Int].withInput(broadcast.out1)
val out1 = From[Int].withInput(broadcast.out2)
val out2 = From[String]
// FIXME: make me not compile
//"out2.withInput(broadcast.out2)" shouldNot compile
}
}
}