Merge pull request #15765 from akka/wip-15744-newdsl-transform-patriknw
=str #15744 Hook-up transform to the AST nodes
This commit is contained in:
commit
a25d0fcde9
13 changed files with 1042 additions and 660 deletions
|
|
@ -1,114 +0,0 @@
|
|||
package akka.http.server
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.http.Http
|
||||
import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse }
|
||||
import akka.http.parsing.HttpRequestParser
|
||||
import akka.http.parsing.ParserOutput._
|
||||
import akka.http.rendering.ResponseRenderingContext
|
||||
import akka.stream.dsl._
|
||||
import akka.stream.io.StreamTcp
|
||||
import akka.stream.{ FlowMaterializer, Transformer }
|
||||
import akka.util.ByteString
|
||||
|
||||
class NewDslHttpServerPipeline(settings: ServerSettings,
|
||||
materializer: FlowMaterializer,
|
||||
log: LoggingAdapter) {
|
||||
|
||||
import akka.http.server.NewDslHttpServerPipeline._
|
||||
|
||||
val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)()
|
||||
val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒
|
||||
if (settings.parserSettings.illegalHeaderWarnings)
|
||||
log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
|
||||
|
||||
val responseRendererFactory = new {
|
||||
def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow graph:
|
||||
*
|
||||
* tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest]
|
||||
* |
|
||||
* \-> applicationBypassFlow -\
|
||||
* |
|
||||
* Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream
|
||||
*/
|
||||
def apply(tcpConn: StreamTcp.IncomingTcpConnection) = {
|
||||
|
||||
val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]()
|
||||
val merge = Merge[MessageStart, HttpResponse, Any]()
|
||||
|
||||
val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] =
|
||||
From(tcpConn.inputStream)
|
||||
.transform(rootParser.copyWith(warnOnIllegalHeader))
|
||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd)
|
||||
.headAndTail()
|
||||
.withOutput(broadcast.in)
|
||||
|
||||
val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] =
|
||||
From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]
|
||||
.withInput(broadcast.out1)
|
||||
.collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x }
|
||||
.withOutput(merge.in1)
|
||||
|
||||
val requestPublisher = PublisherOut[HttpRequest]()
|
||||
val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] =
|
||||
From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]
|
||||
.withInput(broadcast.out2)
|
||||
.collect {
|
||||
case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒
|
||||
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
|
||||
val publisher = PublisherOut[RequestOutput]()
|
||||
val flow = entityParts.withOutput(publisher)
|
||||
HttpRequest(method, effectiveUri, headers, createEntity(publisher.publisher), protocol)
|
||||
}
|
||||
.withOutput(requestPublisher)
|
||||
|
||||
val responseSubscriber = SubscriberIn[HttpResponse]()
|
||||
val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] =
|
||||
From[HttpResponse]
|
||||
.withInput(responseSubscriber)
|
||||
.withOutput(merge.in2)
|
||||
|
||||
val responseFlowAfterMerge: ClosedFlow[Any, ByteString] =
|
||||
From[Any]
|
||||
.withInput(merge.out)
|
||||
.transform(applyApplicationBypass)
|
||||
.transform(responseRendererFactory.newRenderer)
|
||||
.flatten(FlattenStrategy.concatOpenOutputFlow)
|
||||
.transform(errorLogger(log, "Outgoing response stream error"))
|
||||
.withOutput(SubscriberOut(tcpConn.outputStream))
|
||||
|
||||
Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher.publisher, responseSubscriber.subscriber)
|
||||
}
|
||||
|
||||
def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ???
|
||||
|
||||
private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ???
|
||||
}
|
||||
|
||||
object NewDslHttpServerPipeline {
|
||||
|
||||
/**
|
||||
* FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow`
|
||||
* or `OpenOutputFlow` to `HasOpenOutput`.
|
||||
*
|
||||
* Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`.
|
||||
*/
|
||||
implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal {
|
||||
def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = {
|
||||
val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] =
|
||||
From[OpenOutputFlow[InnerIn, InnerOut]]
|
||||
.map { f ⇒
|
||||
f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) }
|
||||
}
|
||||
|
||||
val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] =
|
||||
flow.flatten(FlattenStrategy.concatOpenOutputFlow)
|
||||
|
||||
underlying.append(flattened)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,228 +0,0 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
import akka.stream.impl.Ast
|
||||
import org.reactivestreams.{ Subscriber, Publisher }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy }
|
||||
|
||||
sealed trait Flow[-In, +Out] {
|
||||
val transform: Transform[In, Out]
|
||||
}
|
||||
|
||||
object From {
|
||||
/**
|
||||
* Helper to create Flow without Input.
|
||||
* Example usage: From[Int]
|
||||
*/
|
||||
def apply[T]: OpenFlow[T, T] = OpenFlow[T, T](EmptyTransform[T, T]())
|
||||
|
||||
/**
|
||||
* Helper to create Flow with Input from Iterable.
|
||||
* Example usage: Flow(Seq(1,2,3))
|
||||
*/
|
||||
def apply[T](i: immutable.Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i))
|
||||
|
||||
/**
|
||||
* Helper to create Flow with Input from Future.
|
||||
* Example usage: Flow(Future { 1 })
|
||||
*/
|
||||
def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f))
|
||||
|
||||
/**
|
||||
* Helper to create Flow with Input from Publisher.
|
||||
*/
|
||||
def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p))
|
||||
}
|
||||
|
||||
trait Input[-In]
|
||||
|
||||
/**
|
||||
* Default input.
|
||||
* Allows to materialize a Flow with this input to Subscriber.
|
||||
*/
|
||||
final case class SubscriberIn[-In]() extends Input[In] {
|
||||
def subscriber[I <: In]: Subscriber[I] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Input from Publisher.
|
||||
*/
|
||||
final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In]
|
||||
|
||||
/**
|
||||
* Input from Iterable
|
||||
*
|
||||
* Changing In from Contravariant to Covariant is needed because Iterable[+A].
|
||||
* But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any]
|
||||
*/
|
||||
final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In]
|
||||
|
||||
/**
|
||||
* Input from Future
|
||||
*
|
||||
* Changing In from Contravariant to Covariant is needed because Future[+A].
|
||||
* But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any]
|
||||
*/
|
||||
final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In]
|
||||
|
||||
trait Output[+Out]
|
||||
|
||||
/**
|
||||
* Default output.
|
||||
* Allows to materialize a Flow with this output to Publisher.
|
||||
*/
|
||||
final case class PublisherOut[+Out]() extends Output[Out] {
|
||||
def publisher[O >: Out]: Publisher[O] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Output to a Subscriber.
|
||||
*/
|
||||
final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out]
|
||||
|
||||
/**
|
||||
* Fold output. Reduces output stream according to the given fold function.
|
||||
*/
|
||||
final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] {
|
||||
def future: Future[T] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Operations with a Flow which has open (no attached) Input.
|
||||
*
|
||||
* No Out type parameter would be useful for Graph signatures, but we need it here
|
||||
* for `withInput` and `prependTransform` methods.
|
||||
*/
|
||||
sealed trait HasOpenInput[-In, +Out] {
|
||||
type Repr[-In, +Out] <: HasOpenInput[In, Out]
|
||||
type AfterCloseInput[-In, +Out] <: Flow[In, Out]
|
||||
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out]
|
||||
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out]
|
||||
|
||||
// linear combinators with flows
|
||||
def prepend[T](f: OpenFlow[T, In]): Repr[T, Out] =
|
||||
prependTransform(f.transform)
|
||||
def prepend[T](f: OpenOutputFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] =
|
||||
prependTransform(f.transform).withInput(f.input)
|
||||
}
|
||||
|
||||
/**
|
||||
* Operations with a Flow which has open (no attached) Output.
|
||||
*
|
||||
* No In type parameter would be useful for Graph signatures, but we need it here
|
||||
* for `withOutput` and `appendTransform` methods.
|
||||
*/
|
||||
trait HasOpenOutput[-In, +Out] {
|
||||
type Repr[-In, +Out] <: HasOpenOutput[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] <: Flow[In, Out]
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O]
|
||||
protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T]
|
||||
|
||||
// linear simple combinators
|
||||
def map[T](f: Out ⇒ T): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def filter(p: Out ⇒ Boolean): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def drop(n: Int): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def dropWithin(d: FiniteDuration): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def take(n: Int): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def takeWithin(d: FiniteDuration): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
def grouped(n: Int): Repr[In, immutable.Seq[Out]] =
|
||||
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
|
||||
def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] =
|
||||
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
|
||||
def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def transform[T](transformer: Transformer[Out, T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] =
|
||||
appendTransform(EmptyTransform[Out, S]())
|
||||
def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] =
|
||||
appendTransform(EmptyTransform[Out, O]())
|
||||
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] =
|
||||
appendTransform(EmptyTransform[Out, Out]())
|
||||
|
||||
// linear combinators which produce multiple flows
|
||||
def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], OpenOutputFlow[O, O])] =
|
||||
appendTransform(EmptyTransform[Out, (immutable.Seq[O], OpenOutputFlow[O, O])]())
|
||||
def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, OpenOutputFlow[O, O])] =
|
||||
appendTransform(EmptyTransform[Out, (K, OpenOutputFlow[O, O])]())
|
||||
def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, OpenOutputFlow[O, O]] =
|
||||
appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]())
|
||||
|
||||
// linear combinators which consume multiple flows
|
||||
def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] =
|
||||
appendTransform(EmptyTransform[Out, T]())
|
||||
|
||||
// linear combinators with flows
|
||||
def append[T](f: OpenFlow[Out, T]): Repr[In, T] =
|
||||
appendTransform(f.transform)
|
||||
def append[T](f: OpenInputFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] =
|
||||
appendTransform(f.transform).withOutput(f.output)
|
||||
}
|
||||
|
||||
final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] {
|
||||
override type Repr[-In, +Out] = OpenFlow[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out]
|
||||
type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out]
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform)
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform)
|
||||
|
||||
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform)
|
||||
protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = OpenFlow(transform ++ t)
|
||||
}
|
||||
|
||||
final case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] {
|
||||
type Repr[-In, +Out] = OpenInputFlow[In, Out]
|
||||
type AfterCloseInput[-In, +Out] = ClosedFlow[In, Out]
|
||||
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = ClosedFlow(in, output, transform)
|
||||
def withoutOutput: OpenFlow[In, Out] = OpenFlow(transform)
|
||||
|
||||
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] =
|
||||
OpenInputFlow(output, t ++ transform)
|
||||
}
|
||||
|
||||
final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] {
|
||||
override type Repr[-In, +Out] = OpenOutputFlow[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out]
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform)
|
||||
def withoutInput: OpenFlow[In, Out] = OpenFlow(transform)
|
||||
|
||||
protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t)
|
||||
}
|
||||
|
||||
final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] {
|
||||
def withoutOutput: OpenOutputFlow[In, Out] = OpenOutputFlow(input, transform)
|
||||
def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform)
|
||||
|
||||
def run(): Unit = ()
|
||||
}
|
||||
|
||||
trait Transform[-In, +Out] {
|
||||
def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]()
|
||||
}
|
||||
final case class EmptyTransform[-In, +Out]() extends Transform[In, Out]
|
||||
|
||||
object FlattenStrategy {
|
||||
def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]()
|
||||
def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]()
|
||||
|
||||
final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out]
|
||||
final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out]
|
||||
}
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
final case class Merge[T, U, V >: T with U]() {
|
||||
val in1 = new Output[T] {}
|
||||
val in2 = new Output[U] {}
|
||||
val out = new Input[V] {}
|
||||
}
|
||||
|
||||
final case class Zip[T, U]() {
|
||||
val in1 = new Output[T] {}
|
||||
val in2 = new Output[U] {}
|
||||
val out = new Input[(T, U)] {}
|
||||
}
|
||||
|
||||
final case class Concat[T, U, V >: T with U]() {
|
||||
val in1 = new Output[T] {}
|
||||
val in2 = new Output[U] {}
|
||||
val out = new Input[V] {}
|
||||
}
|
||||
|
||||
final case class Broadcast[T]() {
|
||||
val in = new Output[T] {}
|
||||
val out1 = new Input[T] {}
|
||||
val out2 = new Input[T] {}
|
||||
}
|
||||
|
|
@ -45,7 +45,7 @@ private[akka] object ActorProcessor {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class ActorProcessor[I, O] private (impl: ActorRef) extends ActorPublisher[O](impl, None)
|
||||
private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl, None)
|
||||
with Processor[I, O] {
|
||||
override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s)
|
||||
override def onError(t: Throwable): Unit = impl ! OnError(t)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl2
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef }
|
||||
import akka.pattern.ask
|
||||
import org.reactivestreams.{ Processor, Publisher, Subscriber }
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.{ Failure, Success }
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.scaladsl2.FlowMaterializer
|
||||
import akka.stream.MaterializerSettings
|
||||
import akka.stream.impl.EmptyPublisher
|
||||
import akka.stream.impl.ActorPublisher
|
||||
import akka.stream.impl.IterablePublisher
|
||||
import akka.stream.impl.TransformProcessorImpl
|
||||
import akka.stream.impl.ActorProcessor
|
||||
import akka.stream.impl.ExposedPublisher
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object Ast {
|
||||
sealed trait AstNode {
|
||||
def name: String
|
||||
}
|
||||
|
||||
case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode
|
||||
|
||||
trait PublisherNode[I] {
|
||||
private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I]
|
||||
}
|
||||
|
||||
final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] {
|
||||
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher
|
||||
}
|
||||
|
||||
final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] {
|
||||
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
|
||||
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
|
||||
else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
|
||||
name = s"$flowName-0-iterable"), Some(iterable))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] case class ActorBasedFlowMaterializer(
|
||||
override val settings: MaterializerSettings,
|
||||
supervisor: ActorRef,
|
||||
flowNameCounter: AtomicLong,
|
||||
namePrefix: String)
|
||||
extends FlowMaterializer(settings) {
|
||||
import akka.stream.impl2.Ast._
|
||||
|
||||
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
|
||||
|
||||
private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
|
||||
|
||||
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
|
||||
|
||||
@tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode],
|
||||
flowName: String, n: Int): Subscriber[_] = {
|
||||
ops match {
|
||||
case op :: tail ⇒
|
||||
val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n)
|
||||
opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]])
|
||||
processorChain(opProcessor, tail, flowName, n - 1)
|
||||
case _ ⇒ topSubscriber
|
||||
}
|
||||
}
|
||||
|
||||
// Ops come in reverse order
|
||||
override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = {
|
||||
val flowName = createFlowName()
|
||||
if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]]
|
||||
else {
|
||||
val opsSize = ops.size
|
||||
val opProcessor = processorForNode(ops.head, flowName, opsSize)
|
||||
val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1)
|
||||
publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]])
|
||||
opProcessor.asInstanceOf[Publisher[O]]
|
||||
}
|
||||
}
|
||||
|
||||
private val identityTransform = Transform("identity", () ⇒
|
||||
new Transformer[Any, Any] {
|
||||
override def onNext(element: Any) = List(element)
|
||||
})
|
||||
|
||||
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
|
||||
val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}")
|
||||
ActorProcessorFactory(impl)
|
||||
}
|
||||
|
||||
def actorOf(props: Props, name: String): ActorRef = supervisor match {
|
||||
case ref: LocalActorRef ⇒
|
||||
ref.underlying.attachChild(props, name, systemService = false)
|
||||
case ref: RepointableActorRef ⇒
|
||||
if (ref.isStarted)
|
||||
ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false)
|
||||
else {
|
||||
implicit val timeout = ref.system.settings.CreationTimeout
|
||||
val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef]
|
||||
Await.result(f, timeout.duration)
|
||||
}
|
||||
case _ ⇒
|
||||
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider {
|
||||
override def get(system: ActorSystem): FlowNameCounter = super.get(system)
|
||||
override def lookup = FlowNameCounter
|
||||
override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class FlowNameCounter extends Extension {
|
||||
val counter = new AtomicLong(0)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object StreamSupervisor {
|
||||
def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings))
|
||||
|
||||
case class Materialize(props: Props, name: String)
|
||||
}
|
||||
|
||||
private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor {
|
||||
import StreamSupervisor._
|
||||
|
||||
def receive = {
|
||||
case Materialize(props, name) ⇒
|
||||
val impl = context.actorOf(props, name)
|
||||
sender() ! impl
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object ActorProcessorFactory {
|
||||
|
||||
import Ast._
|
||||
def props(settings: MaterializerSettings, op: AstNode): Props =
|
||||
(op match {
|
||||
case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer()))
|
||||
}).withDispatcher(settings.dispatcher)
|
||||
|
||||
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
|
||||
val p = new ActorProcessor[I, O](impl)
|
||||
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
|
||||
p
|
||||
}
|
||||
}
|
||||
218
akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala
Normal file
218
akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala
Normal file
|
|
@ -0,0 +1,218 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl2
|
||||
|
||||
import scala.language.higherKinds
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import org.reactivestreams.Publisher
|
||||
import org.reactivestreams.Subscriber
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.impl.BlackholeSubscriber
|
||||
import akka.stream.impl2.Ast._
|
||||
|
||||
sealed trait Flow
|
||||
|
||||
object FlowFrom {
|
||||
/**
|
||||
* Helper to create `Flow` without [[Input]].
|
||||
* Example usage: `FlowFrom[Int]`
|
||||
*/
|
||||
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil)
|
||||
|
||||
/**
|
||||
* Helper to create `Flow` with Input from `Iterable`.
|
||||
* Example usage: `FlowFrom(Seq(1,2,3))`
|
||||
*/
|
||||
def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(IterableIn(i))
|
||||
|
||||
/**
|
||||
* Helper to create `Flow` with [[Input]] from `Publisher`.
|
||||
*/
|
||||
def apply[T](p: Publisher[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(PublisherIn(p))
|
||||
}
|
||||
|
||||
trait Input[-In]
|
||||
|
||||
/**
|
||||
* Default input.
|
||||
* Allows to materialize a Flow with this input to Subscriber.
|
||||
*/
|
||||
final case class SubscriberIn[-In]() extends Input[In] {
|
||||
def subscriber[I <: In]: Subscriber[I] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Input from Publisher.
|
||||
*/
|
||||
final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In]
|
||||
|
||||
/**
|
||||
* Input from Iterable
|
||||
*
|
||||
* Changing In from Contravariant to Covariant is needed because Iterable[+A].
|
||||
* But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any]
|
||||
*/
|
||||
final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In]
|
||||
|
||||
/**
|
||||
* Input from Future
|
||||
*
|
||||
* Changing In from Contravariant to Covariant is needed because Future[+A].
|
||||
* But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any]
|
||||
*/
|
||||
final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In]
|
||||
|
||||
trait Output[+Out]
|
||||
|
||||
/**
|
||||
* Default output.
|
||||
* Allows to materialize a Flow with this output to Publisher.
|
||||
*/
|
||||
final case class PublisherOut[+Out]() extends Output[Out] {
|
||||
def publisher[O >: Out]: Publisher[O] = ???
|
||||
}
|
||||
|
||||
final case class BlackholeOut[+Out]() extends Output[Out] {
|
||||
def publisher[O >: Out]: Publisher[O] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Output to a Subscriber.
|
||||
*/
|
||||
final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out]
|
||||
|
||||
/**
|
||||
* Fold output. Reduces output stream according to the given fold function.
|
||||
*/
|
||||
final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] {
|
||||
def future: Future[T] = ???
|
||||
}
|
||||
|
||||
/**
|
||||
* Operations with a Flow which has open (no attached) Input.
|
||||
*
|
||||
* No Out type parameter would be useful for Graph signatures, but we need it here
|
||||
* for `withInput` and `prependTransform` methods.
|
||||
*/
|
||||
sealed trait HasOpenInput[-In, +Out] extends Flow {
|
||||
type Repr[-In, +Out] <: HasOpenInput[In, Out]
|
||||
type AfterCloseInput[-In, +Out] <: Flow
|
||||
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out]
|
||||
|
||||
def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out]
|
||||
def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out]
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Operations with a Flow which has open (no attached) Output.
|
||||
*
|
||||
* No In type parameter would be useful for Graph signatures, but we need it here
|
||||
* for `withOutput`.
|
||||
*/
|
||||
trait HasOpenOutput[-In, +Out] extends Flow {
|
||||
type Repr[-In, +Out] <: HasOpenOutput[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] <: Flow
|
||||
|
||||
// Storing ops in reverse order
|
||||
protected def andThen[U](op: AstNode): Repr[In, U]
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O]
|
||||
|
||||
def map[T](f: Out ⇒ T): Repr[In, T] =
|
||||
transform("map", () ⇒ new Transformer[Out, T] {
|
||||
override def onNext(in: Out) = List(f(in))
|
||||
})
|
||||
|
||||
def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = {
|
||||
andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]]))
|
||||
}
|
||||
|
||||
def append[T](f: ProcessorFlow[Out, T]): Repr[In, T]
|
||||
def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow without attached input and without attached output, can be used as a `Processor`.
|
||||
*/
|
||||
final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] {
|
||||
override type Repr[-In, +Out] = ProcessorFlow[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out]
|
||||
type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out]
|
||||
|
||||
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, ops)
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, ops)
|
||||
|
||||
override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
|
||||
ProcessorFlow(ops ::: f.ops)
|
||||
override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] =
|
||||
PublisherFlow(f.input, ops ::: f.ops)
|
||||
|
||||
override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops)
|
||||
override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] =
|
||||
SubscriberFlow(f.output, f.ops ++: ops)
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow with attached output, can be used as a `Subscriber`.
|
||||
*/
|
||||
final case class SubscriberFlow[-In, +Out](output: Output[Out], ops: List[AstNode]) extends HasOpenInput[In, Out] {
|
||||
type Repr[-In, +Out] = SubscriberFlow[In, Out]
|
||||
type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out]
|
||||
|
||||
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, ops)
|
||||
def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(ops)
|
||||
|
||||
override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
|
||||
SubscriberFlow(output, ops ::: f.ops)
|
||||
override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] =
|
||||
RunnableFlow(f.input, output, ops ::: f.ops)
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow with attached input, can be used as a `Publisher`.
|
||||
*/
|
||||
final case class PublisherFlow[-In, +Out](input: Input[In], ops: List[AstNode]) extends HasOpenOutput[In, Out] {
|
||||
override type Repr[-In, +Out] = PublisherFlow[In, Out]
|
||||
type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out]
|
||||
|
||||
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
|
||||
|
||||
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, ops)
|
||||
def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(ops)
|
||||
|
||||
override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = PublisherFlow(input, f.ops ++: ops)
|
||||
override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] =
|
||||
RunnableFlow(input, f.output, f.ops ++: ops)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow with attached input and output, can be executed.
|
||||
*/
|
||||
final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], ops: List[AstNode]) extends Flow {
|
||||
def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, ops)
|
||||
def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, ops)
|
||||
|
||||
// FIXME
|
||||
def run()(implicit materializer: FlowMaterializer): Unit =
|
||||
produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize))
|
||||
|
||||
// FIXME replace with run and input/output factories
|
||||
def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] =
|
||||
input match {
|
||||
case PublisherIn(p) ⇒ materializer.toPublisher(ExistingPublisher(p), ops)
|
||||
case IterableIn(iter) ⇒ materializer.toPublisher(IterablePublisherNode(iter), ops)
|
||||
case _ ⇒ ???
|
||||
}
|
||||
|
||||
def produceTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit =
|
||||
toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]])
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl2
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.ActorRefFactory
|
||||
import akka.stream.impl2.ActorBasedFlowMaterializer
|
||||
import akka.stream.impl2.Ast
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.ActorContext
|
||||
import akka.stream.impl2.StreamSupervisor
|
||||
import akka.stream.impl2.FlowNameCounter
|
||||
import akka.stream.MaterializerSettings
|
||||
|
||||
object FlowMaterializer {
|
||||
|
||||
/**
|
||||
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
* to another actor if the factory is an ActorContext.
|
||||
*
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
|
||||
val system = context match {
|
||||
case s: ExtendedActorSystem ⇒ s
|
||||
case c: ActorContext ⇒ c.system
|
||||
case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined")
|
||||
case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " +
|
||||
"got [${_contex.getClass.getName}]")
|
||||
}
|
||||
|
||||
new ActorBasedFlowMaterializer(
|
||||
settings,
|
||||
context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)),
|
||||
FlowNameCounter(system).counter,
|
||||
namePrefix.getOrElse("flow"))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
* to another actor if the factory is an ActorContext.
|
||||
*/
|
||||
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
|
||||
apply(settings)(context)
|
||||
}
|
||||
|
||||
/**
|
||||
* A FlowMaterializer takes the list of transformations comprising a
|
||||
* [[akka.stream.scaladsl.Flow]] and materializes them in the form of
|
||||
* [[org.reactivestreams.Processor]] instances. How transformation
|
||||
* steps are split up into asynchronous regions is implementation
|
||||
* dependent.
|
||||
*/
|
||||
abstract class FlowMaterializer(val settings: MaterializerSettings) {
|
||||
|
||||
/**
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps.
|
||||
*/
|
||||
def withNamePrefix(name: String): FlowMaterializer
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* ops are stored in reverse order
|
||||
*/
|
||||
private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O]
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -1,94 +0,0 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
import akka.stream.OverflowStrategy
|
||||
|
||||
class CombinatorSpec extends WordSpec with Matchers {
|
||||
val f = From[Int]
|
||||
|
||||
"Linear simple combinators in Flow" should {
|
||||
"map" in {
|
||||
val t: OpenFlow[Int, Int] = f.map(_ * 2)
|
||||
}
|
||||
"mapFuture" in {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val t: OpenFlow[Int, Int] = f.mapFuture(Future(_))
|
||||
}
|
||||
"filter" in {
|
||||
val t: OpenFlow[Int, Int] = f.filter(_ != 2)
|
||||
}
|
||||
"collect" in {
|
||||
val t: OpenFlow[Int, String] = f.collect {
|
||||
case i: Int if i == 2 ⇒ "two"
|
||||
}
|
||||
}
|
||||
"fold" in {
|
||||
val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element }
|
||||
val t: OpenInputFlow[Int, Int] = f.withOutput(fo)
|
||||
}
|
||||
"drop" in {
|
||||
val t: OpenFlow[Int, Int] = f.drop(2)
|
||||
}
|
||||
"dropWithin" in {
|
||||
val t: OpenFlow[Int, Int] = f.dropWithin(2.seconds)
|
||||
}
|
||||
"take" in {
|
||||
val t: OpenFlow[Int, Int] = f.take(2)
|
||||
}
|
||||
"takeWithin" in {
|
||||
val t: OpenFlow[Int, Int] = f.takeWithin(2.seconds)
|
||||
}
|
||||
"grouped" in {
|
||||
val t: OpenFlow[Int, immutable.Seq[Int]] = f.grouped(2)
|
||||
}
|
||||
"groupedWithin" in {
|
||||
val t: OpenFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds)
|
||||
}
|
||||
"mapConcat" in {
|
||||
val t: OpenFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) }
|
||||
}
|
||||
"conflate" in {
|
||||
val t: OpenFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i)
|
||||
}
|
||||
"expand" in {
|
||||
val t: OpenFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_"))
|
||||
}
|
||||
"buffer" in {
|
||||
val t: OpenFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead)
|
||||
}
|
||||
}
|
||||
|
||||
"Linear combinators which produce multiple flows" should {
|
||||
"prefixAndTail" in {
|
||||
val t: OpenFlow[Int, (immutable.Seq[String], OpenOutputFlow[String, String])] =
|
||||
f.map(_.toString).prefixAndTail(10)
|
||||
}
|
||||
"groupBy" in {
|
||||
val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] =
|
||||
From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString)
|
||||
|
||||
val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map {
|
||||
case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut()))
|
||||
}
|
||||
|
||||
// both of these compile, even if `grouped` has inner flows unclosed
|
||||
grouped.withOutput(PublisherOut()).run
|
||||
closedInner.withOutput(PublisherOut()).run
|
||||
}
|
||||
"splitWhen" in {
|
||||
val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2)
|
||||
}
|
||||
}
|
||||
|
||||
"Linear combinators which consume multiple flows" should {
|
||||
"flatten" in {
|
||||
val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2)
|
||||
val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,141 +0,0 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
import org.scalatest.{ Matchers, WordSpec }
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
import scala.concurrent.Future
|
||||
|
||||
class FlowSpec extends WordSpec with Matchers {
|
||||
|
||||
val intSeq = IterableIn(Seq(1, 2, 3))
|
||||
val strSeq = IterableIn(Seq("a", "b", "c"))
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val intFut = FutureIn(Future { 3 })
|
||||
|
||||
"OpenFlow" should {
|
||||
"go through all states" in {
|
||||
val f: OpenFlow[Int, Int] = From[Int]
|
||||
.withInput(intSeq)
|
||||
.withOutput(PublisherOut())
|
||||
.withoutInput
|
||||
.withoutOutput
|
||||
}
|
||||
"should not run" in {
|
||||
val open: OpenFlow[Int, Int] = From[Int]
|
||||
"open.run" shouldNot compile
|
||||
}
|
||||
"accept IterableIn" in {
|
||||
val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intSeq)
|
||||
}
|
||||
"accept FutureIn" in {
|
||||
val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intFut)
|
||||
}
|
||||
"append OpenFlow" in {
|
||||
val open1: OpenFlow[Int, String] = From[Int].map(_.toString)
|
||||
val open2: OpenFlow[String, Int] = From[String].map(_.hashCode)
|
||||
val open3: OpenFlow[Int, Int] = open1.append(open2)
|
||||
"open3.run" shouldNot compile
|
||||
|
||||
val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq)
|
||||
"closedInput.run" shouldNot compile
|
||||
|
||||
val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut())
|
||||
"closedOutput.run" shouldNot compile
|
||||
|
||||
closedInput.withOutput(PublisherOut()).run
|
||||
closedOutput.withInput(intSeq).run
|
||||
}
|
||||
"prepend OpenFlow" in {
|
||||
val open1: OpenFlow[Int, String] = From[Int].map(_.toString)
|
||||
val open2: OpenFlow[String, Int] = From[String].map(_.hashCode)
|
||||
val open3: OpenFlow[String, String] = open1.prepend(open2)
|
||||
"open3.run" shouldNot compile
|
||||
|
||||
val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq)
|
||||
"closedInput.run" shouldNot compile
|
||||
|
||||
val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut())
|
||||
"closedOutput.run" shouldNot compile
|
||||
|
||||
closedInput.withOutput(PublisherOut()).run
|
||||
closedOutput.withInput(strSeq).run
|
||||
}
|
||||
"append OpenInputFlow" in {
|
||||
val open: OpenFlow[Int, String] = From[Int].map(_.toString)
|
||||
val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut())
|
||||
val appended: OpenInputFlow[Int, Int] = open.append(closedOutput)
|
||||
"appended.run" shouldNot compile
|
||||
"appended.toFuture" shouldNot compile
|
||||
appended.withInput(intSeq).run
|
||||
}
|
||||
"prepend OpenOutputFlow" in {
|
||||
val open: OpenFlow[Int, String] = From[Int].map(_.toString)
|
||||
val closedInput: OpenOutputFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq)
|
||||
val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput)
|
||||
"prepended.run" shouldNot compile
|
||||
"prepended.withInput(strSeq)" shouldNot compile
|
||||
prepended.withOutput(PublisherOut()).run
|
||||
}
|
||||
}
|
||||
|
||||
"OpenInputFlow" should {
|
||||
val openInput: OpenInputFlow[Int, String] =
|
||||
From[Int].map(_.toString).withOutput(PublisherOut())
|
||||
"accept Input" in {
|
||||
openInput.withInput(intSeq)
|
||||
}
|
||||
"drop Output" in {
|
||||
openInput.withoutOutput
|
||||
}
|
||||
"not drop Input" in {
|
||||
"openInput.withoutInput" shouldNot compile
|
||||
}
|
||||
"not accept Output" in {
|
||||
"openInput.ToFuture" shouldNot compile
|
||||
}
|
||||
"not run" in {
|
||||
"openInput.run" shouldNot compile
|
||||
}
|
||||
}
|
||||
|
||||
"OpenOutputFlow" should {
|
||||
val openOutput: OpenOutputFlow[Int, String] =
|
||||
From(Seq(1, 2, 3)).map(_.toString)
|
||||
"accept Output" in {
|
||||
openOutput.withOutput(PublisherOut())
|
||||
}
|
||||
"drop Input" in {
|
||||
openOutput.withoutInput
|
||||
}
|
||||
"not drop Output" in {
|
||||
"openOutput.withoutOutput" shouldNot compile
|
||||
}
|
||||
"not accept Input" in {
|
||||
"openOutput.withInput(intSeq)" shouldNot compile
|
||||
}
|
||||
"not run" in {
|
||||
"openOutput.run" shouldNot compile
|
||||
}
|
||||
}
|
||||
|
||||
"ClosedFlow" should {
|
||||
val closed: ClosedFlow[Int, String] =
|
||||
From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut())
|
||||
"run" in {
|
||||
closed.run
|
||||
}
|
||||
"drop Input" in {
|
||||
closed.withoutInput
|
||||
}
|
||||
"drop Output" in {
|
||||
closed.withoutOutput
|
||||
}
|
||||
"not accept Input" in {
|
||||
"closed.withInput(intSeq)" shouldNot compile
|
||||
}
|
||||
"not accept Output" in {
|
||||
"closed.ToFuture" shouldNot compile
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,57 +0,0 @@
|
|||
package akka.stream.dsl
|
||||
|
||||
import org.scalatest.{ WordSpec, Matchers }
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
|
||||
class GraphSpec extends WordSpec with Matchers {
|
||||
|
||||
"Graph" should {
|
||||
"merge" in {
|
||||
val merge = Merge[Int, Int, Int]()
|
||||
|
||||
val in1 = From[Int].withOutput(merge.in1)
|
||||
val in2 = From[Int].withOutput(merge.in2)
|
||||
val out1 = From[Int].withInput(merge.out)
|
||||
|
||||
val out2 = From[String]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(merge.out)" shouldNot compile
|
||||
}
|
||||
"zip" in {
|
||||
val zip = Zip[Int, String]()
|
||||
|
||||
val in1 = From[Int].withOutput(zip.in1)
|
||||
val in2 = From[String].withOutput(zip.in2)
|
||||
val out1 = From[(Int, String)].withInput(zip.out)
|
||||
|
||||
val out2 = From[(String, Int)]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(zip.out)" shouldNot compile
|
||||
}
|
||||
"concat" in {
|
||||
trait A
|
||||
trait B extends A
|
||||
|
||||
val concat = Concat[A, B, A]()
|
||||
val in1 = From[A].withOutput(concat.in1)
|
||||
val in2 = From[B].withOutput(concat.in2)
|
||||
val out1 = From[A].withInput(concat.out)
|
||||
|
||||
val out2 = From[String]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(concat.out)" shouldNot compile
|
||||
}
|
||||
"broadcast" in {
|
||||
val broadcast = Broadcast[Int]()
|
||||
|
||||
val in1 = From[Int].withOutput(broadcast.in)
|
||||
val in2 = From[Int].withInput(broadcast.out1)
|
||||
val out1 = From[Int].withInput(broadcast.out2)
|
||||
|
||||
val out2 = From[String]
|
||||
// FIXME: make me not compile
|
||||
//"out2.withInput(broadcast.out2)" shouldNot compile
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl2
|
||||
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
import akka.stream.OverflowStrategy
|
||||
|
||||
class CombinatorSpec extends WordSpec with Matchers {
|
||||
val f = FlowFrom[Int]
|
||||
|
||||
"Linear simple combinators in Flow" should {
|
||||
"map" in {
|
||||
val t: ProcessorFlow[Int, Int] = f.map(_ * 2)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
146
akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala
Normal file
146
akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala
Normal file
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl2
|
||||
|
||||
import org.scalatest.{ Matchers, WordSpec }
|
||||
import scala.collection.immutable.Seq
|
||||
import scala.concurrent.Future
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.MaterializerSettings
|
||||
|
||||
class FlowSpec extends AkkaSpec {
|
||||
|
||||
val intSeq = IterableIn(Seq(1, 2, 3))
|
||||
val strSeq = IterableIn(Seq("a", "b", "c"))
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val intFut = FutureIn(Future { 3 })
|
||||
implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
|
||||
|
||||
"ProcessorFlow" should {
|
||||
"go through all states" in {
|
||||
val f: ProcessorFlow[Int, Int] = FlowFrom[Int]
|
||||
.withInput(intSeq)
|
||||
.withOutput(PublisherOut())
|
||||
.withoutInput
|
||||
.withoutOutput
|
||||
}
|
||||
"should not run" in {
|
||||
val open: ProcessorFlow[Int, Int] = FlowFrom[Int]
|
||||
"open.run()" shouldNot compile
|
||||
}
|
||||
"accept IterableIn" in {
|
||||
val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intSeq)
|
||||
}
|
||||
"accept FutureIn" in {
|
||||
val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intFut)
|
||||
}
|
||||
"append ProcessorFlow" in {
|
||||
val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
|
||||
val open2: ProcessorFlow[String, Int] = FlowFrom[String].map(_.hashCode)
|
||||
val open3: ProcessorFlow[Int, Int] = open1.append(open2)
|
||||
"open3.run()" shouldNot compile
|
||||
|
||||
val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq)
|
||||
"closedInput.run()" shouldNot compile
|
||||
|
||||
val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut())
|
||||
"closedOutput.run()" shouldNot compile
|
||||
|
||||
closedInput.withOutput(PublisherOut()).run()
|
||||
closedOutput.withInput(intSeq).run()
|
||||
}
|
||||
"prepend ProcessorFlow" in {
|
||||
val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
|
||||
val open2: ProcessorFlow[String, Int] = FlowFrom[String].map(_.hashCode)
|
||||
val open3: ProcessorFlow[String, String] = open1.prepend(open2)
|
||||
"open3.run()" shouldNot compile
|
||||
|
||||
val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq)
|
||||
"closedInput.run()" shouldNot compile
|
||||
|
||||
val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut())
|
||||
"closedOutput.run()" shouldNot compile
|
||||
|
||||
closedInput.withOutput(PublisherOut()).run
|
||||
closedOutput.withInput(strSeq).run
|
||||
}
|
||||
"append SubscriberFlow" in {
|
||||
val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
|
||||
val closedOutput: SubscriberFlow[String, Int] = FlowFrom[String].map(_.hashCode).withOutput(PublisherOut())
|
||||
val appended: SubscriberFlow[Int, Int] = open.append(closedOutput)
|
||||
"appended.run()" shouldNot compile
|
||||
"appended.toFuture" shouldNot compile
|
||||
appended.withInput(intSeq).run
|
||||
}
|
||||
"prepend PublisherFlow" in {
|
||||
val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
|
||||
val closedInput: PublisherFlow[String, Int] = FlowFrom[String].map(_.hashCode).withInput(strSeq)
|
||||
val prepended: PublisherFlow[String, String] = open.prepend(closedInput)
|
||||
"prepended.run()" shouldNot compile
|
||||
"prepended.withInput(strSeq)" shouldNot compile
|
||||
prepended.withOutput(PublisherOut()).run
|
||||
}
|
||||
}
|
||||
|
||||
"SubscriberFlow" should {
|
||||
val openInput: SubscriberFlow[Int, String] =
|
||||
FlowFrom[Int].map(_.toString).withOutput(PublisherOut())
|
||||
"accept Input" in {
|
||||
openInput.withInput(intSeq)
|
||||
}
|
||||
"drop Output" in {
|
||||
openInput.withoutOutput
|
||||
}
|
||||
"not drop Input" in {
|
||||
"openInput.withoutInput" shouldNot compile
|
||||
}
|
||||
"not accept Output" in {
|
||||
"openInput.ToFuture" shouldNot compile
|
||||
}
|
||||
"not run()" in {
|
||||
"openInput.run()" shouldNot compile
|
||||
}
|
||||
}
|
||||
|
||||
"PublisherFlow" should {
|
||||
val openOutput: PublisherFlow[Int, String] =
|
||||
FlowFrom(Seq(1, 2, 3)).map(_.toString)
|
||||
"accept Output" in {
|
||||
openOutput.withOutput(PublisherOut())
|
||||
}
|
||||
"drop Input" in {
|
||||
openOutput.withoutInput
|
||||
}
|
||||
"not drop Output" in {
|
||||
"openOutput.withoutOutput" shouldNot compile
|
||||
}
|
||||
"not accept Input" in {
|
||||
"openOutput.withInput(intSeq)" shouldNot compile
|
||||
}
|
||||
"not run()" in {
|
||||
"openOutput.run()" shouldNot compile
|
||||
}
|
||||
}
|
||||
|
||||
"RunnableFlow" should {
|
||||
val closed: RunnableFlow[Int, String] =
|
||||
FlowFrom(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut())
|
||||
"run" in {
|
||||
closed.run()
|
||||
}
|
||||
"drop Input" in {
|
||||
closed.withoutInput
|
||||
}
|
||||
"drop Output" in {
|
||||
closed.withoutOutput
|
||||
}
|
||||
"not accept Input" in {
|
||||
"closed.withInput(intSeq)" shouldNot compile
|
||||
}
|
||||
"not accept Output" in {
|
||||
"closed.ToFuture" shouldNot compile
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,403 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl2
|
||||
|
||||
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
|
||||
import akka.testkit.{ EventFilter, TestProbe }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.collection.immutable.Seq
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.MaterializerSettings
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
|
||||
|
||||
implicit val materializer = FlowMaterializer(MaterializerSettings(
|
||||
initialInputBufferSize = 2,
|
||||
maximumInputBufferSize = 2,
|
||||
initialFanOutBufferSize = 2,
|
||||
maxFanOutBufferSize = 2,
|
||||
dispatcher = "akka.test.stream-dispatcher"))
|
||||
|
||||
"A Flow with transform operations" must {
|
||||
"produce one-to-one transformation as expected" in {
|
||||
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var tot = 0
|
||||
override def onNext(elem: Int) = {
|
||||
tot += elem
|
||||
List(tot)
|
||||
}
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(subscriber)
|
||||
val subscription = subscriber.expectSubscription()
|
||||
subscription.request(1)
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNoMsg(200.millis)
|
||||
subscription.request(2)
|
||||
subscriber.expectNext(3)
|
||||
subscriber.expectNext(6)
|
||||
subscriber.expectComplete()
|
||||
}
|
||||
|
||||
"produce one-to-several transformation as expected" in {
|
||||
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var tot = 0
|
||||
override def onNext(elem: Int) = {
|
||||
tot += elem
|
||||
Vector.fill(elem)(tot)
|
||||
}
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(subscriber)
|
||||
val subscription = subscriber.expectSubscription()
|
||||
subscription.request(4)
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNext(3)
|
||||
subscriber.expectNext(3)
|
||||
subscriber.expectNext(6)
|
||||
subscriber.expectNoMsg(200.millis)
|
||||
subscription.request(100)
|
||||
subscriber.expectNext(6)
|
||||
subscriber.expectNext(6)
|
||||
subscriber.expectComplete()
|
||||
}
|
||||
|
||||
"produce dropping transformation as expected" in {
|
||||
val p = FlowFrom(List(1, 2, 3, 4)).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var tot = 0
|
||||
override def onNext(elem: Int) = {
|
||||
tot += elem
|
||||
if (elem % 2 == 0) {
|
||||
Nil
|
||||
} else {
|
||||
List(tot)
|
||||
}
|
||||
}
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(subscriber)
|
||||
val subscription = subscriber.expectSubscription()
|
||||
subscription.request(1)
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNoMsg(200.millis)
|
||||
subscription.request(1)
|
||||
subscriber.expectNext(6)
|
||||
subscription.request(1)
|
||||
subscriber.expectComplete()
|
||||
}
|
||||
|
||||
"produce multi-step transformation as expected" in {
|
||||
val p = FlowFrom(List("a", "bc", "def")).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[String, Int] {
|
||||
var concat = ""
|
||||
override def onNext(elem: String) = {
|
||||
concat += elem
|
||||
List(concat.length)
|
||||
}
|
||||
}).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var tot = 0
|
||||
override def onNext(length: Int) = {
|
||||
tot += length
|
||||
List(tot)
|
||||
}
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val c1 = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(c1)
|
||||
val sub1 = c1.expectSubscription()
|
||||
val c2 = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(c2)
|
||||
val sub2 = c2.expectSubscription()
|
||||
sub1.request(1)
|
||||
sub2.request(2)
|
||||
c1.expectNext(1)
|
||||
c2.expectNext(1)
|
||||
c2.expectNext(4)
|
||||
c1.expectNoMsg(200.millis)
|
||||
sub1.request(2)
|
||||
sub2.request(2)
|
||||
c1.expectNext(4)
|
||||
c1.expectNext(10)
|
||||
c2.expectNext(10)
|
||||
c1.expectComplete()
|
||||
c2.expectComplete()
|
||||
}
|
||||
|
||||
"invoke onComplete when done" in {
|
||||
val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[String, String] {
|
||||
var s = ""
|
||||
override def onNext(element: String) = {
|
||||
s += element
|
||||
Nil
|
||||
}
|
||||
override def onTermination(e: Option[Throwable]) = List(s + "B")
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val c = StreamTestKit.SubscriberProbe[String]()
|
||||
p2.subscribe(c)
|
||||
val s = c.expectSubscription()
|
||||
s.request(1)
|
||||
c.expectNext("aB")
|
||||
c.expectComplete()
|
||||
}
|
||||
|
||||
"invoke cleanup when done" in {
|
||||
val cleanupProbe = TestProbe()
|
||||
val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[String, String] {
|
||||
var s = ""
|
||||
override def onNext(element: String) = {
|
||||
s += element
|
||||
Nil
|
||||
}
|
||||
override def onTermination(e: Option[Throwable]) = List(s + "B")
|
||||
override def cleanup() = cleanupProbe.ref ! s
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val c = StreamTestKit.SubscriberProbe[String]()
|
||||
p2.subscribe(c)
|
||||
val s = c.expectSubscription()
|
||||
s.request(1)
|
||||
c.expectNext("aB")
|
||||
c.expectComplete()
|
||||
cleanupProbe.expectMsg("a")
|
||||
}
|
||||
|
||||
"invoke cleanup when done consume" in {
|
||||
val cleanupProbe = TestProbe()
|
||||
val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher()
|
||||
FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[String, String] {
|
||||
var s = "x"
|
||||
override def onNext(element: String) = {
|
||||
s = element
|
||||
List(element)
|
||||
}
|
||||
override def cleanup() = cleanupProbe.ref ! s
|
||||
}).
|
||||
withOutput(BlackholeOut()).run()
|
||||
cleanupProbe.expectMsg("a")
|
||||
}
|
||||
|
||||
"invoke cleanup when done after error" in {
|
||||
val cleanupProbe = TestProbe()
|
||||
val p = FlowFrom(List("a", "b", "c")).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[String, String] {
|
||||
var s = ""
|
||||
override def onNext(in: String) = {
|
||||
if (in == "b") {
|
||||
throw new IllegalArgumentException("Not b") with NoStackTrace
|
||||
} else {
|
||||
val out = s + in
|
||||
s += in.toUpperCase
|
||||
List(out)
|
||||
}
|
||||
}
|
||||
override def onTermination(e: Option[Throwable]) = List(s + "B")
|
||||
override def cleanup() = cleanupProbe.ref ! s
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val c = StreamTestKit.SubscriberProbe[String]()
|
||||
p2.subscribe(c)
|
||||
val s = c.expectSubscription()
|
||||
s.request(1)
|
||||
c.expectNext("a")
|
||||
s.request(1)
|
||||
c.expectError()
|
||||
cleanupProbe.expectMsg("A")
|
||||
}
|
||||
|
||||
"allow cancellation using isComplete" in {
|
||||
val p = StreamTestKit.PublisherProbe[Int]()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var s = ""
|
||||
override def onNext(element: Int) = {
|
||||
s += element
|
||||
List(element)
|
||||
}
|
||||
override def isComplete = s == "1"
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val proc = p.expectSubscription
|
||||
val c = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(c)
|
||||
val s = c.expectSubscription()
|
||||
s.request(10)
|
||||
proc.sendNext(1)
|
||||
proc.sendNext(2)
|
||||
c.expectNext(1)
|
||||
c.expectComplete()
|
||||
proc.expectCancellation()
|
||||
}
|
||||
|
||||
"call onComplete after isComplete signaled completion" in {
|
||||
val cleanupProbe = TestProbe()
|
||||
val p = StreamTestKit.PublisherProbe[Int]()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var s = ""
|
||||
override def onNext(element: Int) = {
|
||||
s += element
|
||||
List(element)
|
||||
}
|
||||
override def isComplete = s == "1"
|
||||
override def onTermination(e: Option[Throwable]) = List(s.length + 10)
|
||||
override def cleanup() = cleanupProbe.ref ! s
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val proc = p.expectSubscription
|
||||
val c = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(c)
|
||||
val s = c.expectSubscription()
|
||||
s.request(10)
|
||||
proc.sendNext(1)
|
||||
proc.sendNext(2)
|
||||
c.expectNext(1)
|
||||
c.expectNext(11)
|
||||
c.expectComplete()
|
||||
proc.expectCancellation()
|
||||
cleanupProbe.expectMsg("1")
|
||||
}
|
||||
|
||||
"report error when exception is thrown" in {
|
||||
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
override def onNext(elem: Int) = {
|
||||
if (elem == 2) {
|
||||
throw new IllegalArgumentException("two not allowed")
|
||||
} else {
|
||||
List(elem, elem)
|
||||
}
|
||||
}
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(subscriber)
|
||||
val subscription = subscriber.expectSubscription()
|
||||
EventFilter[IllegalArgumentException]("two not allowed") intercept {
|
||||
subscription.request(100)
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectError().getMessage should be("two not allowed")
|
||||
subscriber.expectNoMsg(200.millis)
|
||||
}
|
||||
}
|
||||
|
||||
"support cancel as expected" in {
|
||||
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
override def onNext(elem: Int) = List(elem, elem)
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(subscriber)
|
||||
val subscription = subscriber.expectSubscription()
|
||||
subscription.request(2)
|
||||
subscriber.expectNext(1)
|
||||
subscription.cancel()
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNoMsg(500.millis)
|
||||
subscription.request(2)
|
||||
subscriber.expectNoMsg(200.millis)
|
||||
}
|
||||
|
||||
"support producing elements from empty inputs" in {
|
||||
val p = FlowFrom(List.empty[Int]).withOutput(PublisherOut()).toPublisher()
|
||||
val p2 = FlowFrom(p).
|
||||
transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
override def onNext(elem: Int) = Nil
|
||||
override def onTermination(e: Option[Throwable]) = List(1, 2, 3)
|
||||
}).
|
||||
withOutput(PublisherOut()).toPublisher()
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
p2.subscribe(subscriber)
|
||||
val subscription = subscriber.expectSubscription()
|
||||
subscription.request(4)
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNext(2)
|
||||
subscriber.expectNext(3)
|
||||
subscriber.expectComplete()
|
||||
|
||||
}
|
||||
|
||||
"support converting onComplete into onError" in {
|
||||
val subscriber = StreamTestKit.SubscriberProbe[Int]()
|
||||
FlowFrom(List(5, 1, 2, 3)).transform("transform", () ⇒ new Transformer[Int, Int] {
|
||||
var expectedNumberOfElements: Option[Int] = None
|
||||
var count = 0
|
||||
override def onNext(elem: Int) =
|
||||
if (expectedNumberOfElements.isEmpty) {
|
||||
expectedNumberOfElements = Some(elem)
|
||||
Nil
|
||||
} else {
|
||||
count += 1
|
||||
List(elem)
|
||||
}
|
||||
override def onTermination(err: Option[Throwable]) = err match {
|
||||
case Some(e) ⇒ Nil
|
||||
case None ⇒
|
||||
expectedNumberOfElements match {
|
||||
case Some(expected) if count != expected ⇒
|
||||
throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace
|
||||
case _ ⇒ Nil
|
||||
}
|
||||
}
|
||||
}).withOutput(PublisherOut()).produceTo(subscriber)
|
||||
|
||||
val subscription = subscriber.expectSubscription()
|
||||
subscription.request(10)
|
||||
|
||||
subscriber.expectNext(1)
|
||||
subscriber.expectNext(2)
|
||||
subscriber.expectNext(3)
|
||||
subscriber.expectError().getMessage should be("Expected 5, got 3")
|
||||
}
|
||||
|
||||
"be safe to reuse" in {
|
||||
val flow = FlowFrom(1 to 3).transform("transform", () ⇒
|
||||
new Transformer[Int, Int] {
|
||||
var count = 0
|
||||
|
||||
override def onNext(elem: Int): Seq[Int] = {
|
||||
count += 1
|
||||
List(count)
|
||||
}
|
||||
}).withOutput(PublisherOut())
|
||||
|
||||
val s1 = StreamTestKit.SubscriberProbe[Int]()
|
||||
flow.produceTo(s1)
|
||||
s1.expectSubscription().request(3)
|
||||
s1.expectNext(1, 2, 3)
|
||||
s1.expectComplete()
|
||||
|
||||
val s2 = StreamTestKit.SubscriberProbe[Int]()
|
||||
flow.produceTo(s2)
|
||||
s2.expectSubscription().request(3)
|
||||
s2.expectNext(1, 2, 3)
|
||||
s2.expectComplete()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue