diff --git a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala deleted file mode 100644 index 4a9defde83..0000000000 --- a/akka-http-core/src/main/scala/akka/http/server/NewDslHttpServerPipeline.scala +++ /dev/null @@ -1,114 +0,0 @@ -package akka.http.server - -import akka.event.LoggingAdapter -import akka.http.Http -import akka.http.model.{ ErrorInfo, HttpRequest, HttpResponse } -import akka.http.parsing.HttpRequestParser -import akka.http.parsing.ParserOutput._ -import akka.http.rendering.ResponseRenderingContext -import akka.stream.dsl._ -import akka.stream.io.StreamTcp -import akka.stream.{ FlowMaterializer, Transformer } -import akka.util.ByteString - -class NewDslHttpServerPipeline(settings: ServerSettings, - materializer: FlowMaterializer, - log: LoggingAdapter) { - - import akka.http.server.NewDslHttpServerPipeline._ - - val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)() - val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ - if (settings.parserSettings.illegalHeaderWarnings) - log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) - - val responseRendererFactory = new { - def newRenderer: Transformer[ResponseRenderingContext, OpenOutputFlow[ByteString, ByteString]] = ??? - } - - /** - * Flow graph: - * - * tcpConn.inputStream ---> requestFlowBeforeBroadcast -+-> requestFlowAfterBroadcast ---> Publisher[HttpRequest] - * | - * \-> applicationBypassFlow -\ - * | - * Subscriber[HttpResponse] ---> responseFlowBeforeMerge -+-> responseFlowAfterMerge --> tcpConn.outputStream - */ - def apply(tcpConn: StreamTcp.IncomingTcpConnection) = { - - val broadcast = Broadcast[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])]() - val merge = Merge[MessageStart, HttpResponse, Any]() - - val requestFlowBeforeBroadcast: ClosedFlow[ByteString, (RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] = - From(tcpConn.inputStream) - .transform(rootParser.copyWith(warnOnIllegalHeader)) - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail() - .withOutput(broadcast.in) - - val applicationBypassFlow: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), MessageStart] = - From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] - .withInput(broadcast.out1) - .collect[MessageStart with RequestOutput] { case (x: MessageStart, _) ⇒ x } - .withOutput(merge.in1) - - val requestPublisher = PublisherOut[HttpRequest]() - val requestFlowAfterBroadcast: ClosedFlow[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput]), HttpRequest] = - From[(RequestOutput, OpenOutputFlow[RequestOutput, RequestOutput])] - .withInput(broadcast.out2) - .collect { - case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ - val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) - val publisher = PublisherOut[RequestOutput]() - val flow = entityParts.withOutput(publisher) - HttpRequest(method, effectiveUri, headers, createEntity(publisher.publisher), protocol) - } - .withOutput(requestPublisher) - - val responseSubscriber = SubscriberIn[HttpResponse]() - val responseFlowBeforeMerge: ClosedFlow[HttpResponse, HttpResponse] = - From[HttpResponse] - .withInput(responseSubscriber) - .withOutput(merge.in2) - - val responseFlowAfterMerge: ClosedFlow[Any, ByteString] = - From[Any] - .withInput(merge.out) - .transform(applyApplicationBypass) - .transform(responseRendererFactory.newRenderer) - .flatten(FlattenStrategy.concatOpenOutputFlow) - .transform(errorLogger(log, "Outgoing response stream error")) - .withOutput(SubscriberOut(tcpConn.outputStream)) - - Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher.publisher, responseSubscriber.subscriber) - } - - def applyApplicationBypass: Transformer[Any, ResponseRenderingContext] = ??? - - private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = ??? -} - -object NewDslHttpServerPipeline { - - /** - * FIXME: We can't use `HasOpenOutput` here, because conversion would convert either `OpenFlow` - * or `OpenOutputFlow` to `HasOpenOutput`. - * - * Therefore we need two separate conversions, one for `OpeFlow` another for `OpenOutputFlow`. - */ - implicit class OpenOutputFlowWithHeadAndTail[In, InnerIn, InnerOut](val underlying: OpenOutputFlow[In, OpenOutputFlow[InnerIn, InnerOut]]) extends AnyVal { - def headAndTail(): OpenOutputFlow[In, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = { - val flow: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], OpenOutputFlow[InnerIn, (InnerOut, OpenOutputFlow[InnerOut, InnerOut])]] = - From[OpenOutputFlow[InnerIn, InnerOut]] - .map { f ⇒ - f.prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) } - } - - val flattened: OpenFlow[OpenOutputFlow[InnerIn, InnerOut], (InnerOut, OpenOutputFlow[InnerOut, InnerOut])] = - flow.flatten(FlattenStrategy.concatOpenOutputFlow) - - underlying.append(flattened) - } - } -} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala deleted file mode 100644 index aa2edb4dd8..0000000000 --- a/akka-stream/src/main/scala/akka/stream/dsl/Flow.scala +++ /dev/null @@ -1,228 +0,0 @@ -package akka.stream.dsl - -import akka.stream.impl.Ast -import org.reactivestreams.{ Subscriber, Publisher } - -import scala.collection.immutable -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy } - -sealed trait Flow[-In, +Out] { - val transform: Transform[In, Out] -} - -object From { - /** - * Helper to create Flow without Input. - * Example usage: From[Int] - */ - def apply[T]: OpenFlow[T, T] = OpenFlow[T, T](EmptyTransform[T, T]()) - - /** - * Helper to create Flow with Input from Iterable. - * Example usage: Flow(Seq(1,2,3)) - */ - def apply[T](i: immutable.Iterable[T]): OpenOutputFlow[T, T] = From[T].withInput(IterableIn(i)) - - /** - * Helper to create Flow with Input from Future. - * Example usage: Flow(Future { 1 }) - */ - def apply[T](f: Future[T]): OpenOutputFlow[T, T] = From[T].withInput(FutureIn(f)) - - /** - * Helper to create Flow with Input from Publisher. - */ - def apply[T](p: Publisher[T]): OpenOutputFlow[T, T] = From[T].withInput(PublisherIn(p)) -} - -trait Input[-In] - -/** - * Default input. - * Allows to materialize a Flow with this input to Subscriber. - */ -final case class SubscriberIn[-In]() extends Input[In] { - def subscriber[I <: In]: Subscriber[I] = ??? -} - -/** - * Input from Publisher. - */ -final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] - -/** - * Input from Iterable - * - * Changing In from Contravariant to Covariant is needed because Iterable[+A]. - * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] - */ -final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] - -/** - * Input from Future - * - * Changing In from Contravariant to Covariant is needed because Future[+A]. - * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] - */ -final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] - -trait Output[+Out] - -/** - * Default output. - * Allows to materialize a Flow with this output to Publisher. - */ -final case class PublisherOut[+Out]() extends Output[Out] { - def publisher[O >: Out]: Publisher[O] = ??? -} - -/** - * Output to a Subscriber. - */ -final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] - -/** - * Fold output. Reduces output stream according to the given fold function. - */ -final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { - def future: Future[T] = ??? -} - -/** - * Operations with a Flow which has open (no attached) Input. - * - * No Out type parameter would be useful for Graph signatures, but we need it here - * for `withInput` and `prependTransform` methods. - */ -sealed trait HasOpenInput[-In, +Out] { - type Repr[-In, +Out] <: HasOpenInput[In, Out] - type AfterCloseInput[-In, +Out] <: Flow[In, Out] - - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] - - // linear combinators with flows - def prepend[T](f: OpenFlow[T, In]): Repr[T, Out] = - prependTransform(f.transform) - def prepend[T](f: OpenOutputFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = - prependTransform(f.transform).withInput(f.input) -} - -/** - * Operations with a Flow which has open (no attached) Output. - * - * No In type parameter would be useful for Graph signatures, but we need it here - * for `withOutput` and `appendTransform` methods. - */ -trait HasOpenOutput[-In, +Out] { - type Repr[-In, +Out] <: HasOpenOutput[In, Out] - type AfterCloseOutput[-In, +Out] <: Flow[In, Out] - - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] - protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] - - // linear simple combinators - def map[T](f: Out ⇒ T): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def filter(p: Out ⇒ Boolean): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def drop(n: Int): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def dropWithin(d: FiniteDuration): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def take(n: Int): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def takeWithin(d: FiniteDuration): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - def grouped(n: Int): Repr[In, immutable.Seq[Out]] = - appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) - def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] = - appendTransform(EmptyTransform[Out, immutable.Seq[Out]]()) - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def transform[T](transformer: Transformer[Out, T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = - appendTransform(EmptyTransform[Out, S]()) - def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] = - appendTransform(EmptyTransform[Out, O]()) - def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] = - appendTransform(EmptyTransform[Out, Out]()) - - // linear combinators which produce multiple flows - def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], OpenOutputFlow[O, O])] = - appendTransform(EmptyTransform[Out, (immutable.Seq[O], OpenOutputFlow[O, O])]()) - def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, OpenOutputFlow[O, O])] = - appendTransform(EmptyTransform[Out, (K, OpenOutputFlow[O, O])]()) - def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, OpenOutputFlow[O, O]] = - appendTransform(EmptyTransform[Out, OpenOutputFlow[O, O]]()) - - // linear combinators which consume multiple flows - def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] = - appendTransform(EmptyTransform[Out, T]()) - - // linear combinators with flows - def append[T](f: OpenFlow[Out, T]): Repr[In, T] = - appendTransform(f.transform) - def append[T](f: OpenInputFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = - appendTransform(f.transform).withOutput(f.output) -} - -final case class OpenFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] { - override type Repr[-In, +Out] = OpenFlow[In, Out] - type AfterCloseOutput[-In, +Out] = OpenInputFlow[In, Out] - type AfterCloseInput[-In, +Out] = OpenOutputFlow[In, Out] - - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = OpenInputFlow(out, transform) - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = OpenOutputFlow(in, transform) - - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = OpenFlow(t ++ transform) - protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = OpenFlow(transform ++ t) -} - -final case class OpenInputFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] { - type Repr[-In, +Out] = OpenInputFlow[In, Out] - type AfterCloseInput[-In, +Out] = ClosedFlow[In, Out] - - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = ClosedFlow(in, output, transform) - def withoutOutput: OpenFlow[In, Out] = OpenFlow(transform) - - protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = - OpenInputFlow(output, t ++ transform) -} - -final case class OpenOutputFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] { - override type Repr[-In, +Out] = OpenOutputFlow[In, Out] - type AfterCloseOutput[-In, +Out] = ClosedFlow[In, Out] - - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = ClosedFlow(input, out, transform) - def withoutInput: OpenFlow[In, Out] = OpenFlow(transform) - - protected def appendTransform[T](t: Transform[Out, T]) = OpenOutputFlow(input, transform ++ t) -} - -final case class ClosedFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] { - def withoutOutput: OpenOutputFlow[In, Out] = OpenOutputFlow(input, transform) - def withoutInput: OpenInputFlow[In, Out] = OpenInputFlow(output, transform) - - def run(): Unit = () -} - -trait Transform[-In, +Out] { - def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]() -} -final case class EmptyTransform[-In, +Out]() extends Transform[In, Out] - -object FlattenStrategy { - def concatOpenOutputFlow[In, Out]: FlattenStrategy[OpenOutputFlow[In, Out], Out] = ConcatOpenOutputFlow[In, Out]() - def concatOpenFlow[In, Out]: FlattenStrategy[OpenFlow[In, Out], Out] = ConcatOpenFlow[In, Out]() - - final case class ConcatOpenOutputFlow[In, Out]() extends FlattenStrategy[OpenOutputFlow[In, Out], Out] - final case class ConcatOpenFlow[In, Out]() extends FlattenStrategy[OpenFlow[In, Out], Out] -} diff --git a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala deleted file mode 100644 index 3030f6cff9..0000000000 --- a/akka-stream/src/main/scala/akka/stream/dsl/Graph.scala +++ /dev/null @@ -1,25 +0,0 @@ -package akka.stream.dsl - -final case class Merge[T, U, V >: T with U]() { - val in1 = new Output[T] {} - val in2 = new Output[U] {} - val out = new Input[V] {} -} - -final case class Zip[T, U]() { - val in1 = new Output[T] {} - val in2 = new Output[U] {} - val out = new Input[(T, U)] {} -} - -final case class Concat[T, U, V >: T with U]() { - val in1 = new Output[T] {} - val in2 = new Output[U] {} - val out = new Input[V] {} -} - -final case class Broadcast[T]() { - val in = new Output[T] {} - val out1 = new Input[T] {} - val out2 = new Input[T] {} -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 6a890ed28e..b084e06348 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -45,7 +45,7 @@ private[akka] object ActorProcessor { /** * INTERNAL API */ -private[akka] class ActorProcessor[I, O] private (impl: ActorRef) extends ActorPublisher[O](impl, None) +private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl, None) with Processor[I, O] { override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s) override def onError(t: Throwable): Unit = impl ! OnError(t) diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala new file mode 100644 index 0000000000..a6024b9f51 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl2 + +import java.util.concurrent.atomic.AtomicLong +import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef } +import akka.pattern.ask +import org.reactivestreams.{ Processor, Publisher, Subscriber } +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } +import akka.stream.Transformer +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.MaterializerSettings +import akka.stream.impl.EmptyPublisher +import akka.stream.impl.ActorPublisher +import akka.stream.impl.IterablePublisher +import akka.stream.impl.TransformProcessorImpl +import akka.stream.impl.ActorProcessor +import akka.stream.impl.ExposedPublisher + +/** + * INTERNAL API + */ +private[akka] object Ast { + sealed trait AstNode { + def name: String + } + + case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode + + trait PublisherNode[I] { + private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] + } + + final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] { + def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher + } + + final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { + def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = + if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] + else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), + name = s"$flowName-0-iterable"), Some(iterable)) + } + +} + +/** + * INTERNAL API + */ +private[akka] case class ActorBasedFlowMaterializer( + override val settings: MaterializerSettings, + supervisor: ActorRef, + flowNameCounter: AtomicLong, + namePrefix: String) + extends FlowMaterializer(settings) { + import akka.stream.impl2.Ast._ + + def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) + + private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() + + private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" + + @tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode], + flowName: String, n: Int): Subscriber[_] = { + ops match { + case op :: tail ⇒ + val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n) + opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]]) + processorChain(opProcessor, tail, flowName, n - 1) + case _ ⇒ topSubscriber + } + } + + // Ops come in reverse order + override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = { + val flowName = createFlowName() + if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]] + else { + val opsSize = ops.size + val opProcessor = processorForNode(ops.head, flowName, opsSize) + val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) + publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]]) + opProcessor.asInstanceOf[Publisher[O]] + } + } + + private val identityTransform = Transform("identity", () ⇒ + new Transformer[Any, Any] { + override def onNext(element: Any) = List(element) + }) + + def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") + ActorProcessorFactory(impl) + } + + def actorOf(props: Props, name: String): ActorRef = supervisor match { + case ref: LocalActorRef ⇒ + ref.underlying.attachChild(props, name, systemService = false) + case ref: RepointableActorRef ⇒ + if (ref.isStarted) + ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false) + else { + implicit val timeout = ref.system.settings.CreationTimeout + val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef] + Await.result(f, timeout.duration) + } + case _ ⇒ + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") + } + +} + +/** + * INTERNAL API + */ +private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider { + override def get(system: ActorSystem): FlowNameCounter = super.get(system) + override def lookup = FlowNameCounter + override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter +} + +/** + * INTERNAL API + */ +private[akka] class FlowNameCounter extends Extension { + val counter = new AtomicLong(0) +} + +/** + * INTERNAL API + */ +private[akka] object StreamSupervisor { + def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings)) + + case class Materialize(props: Props, name: String) +} + +private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor { + import StreamSupervisor._ + + def receive = { + case Materialize(props, name) ⇒ + val impl = context.actorOf(props, name) + sender() ! impl + } +} + +/** + * INTERNAL API + */ +private[akka] object ActorProcessorFactory { + + import Ast._ + def props(settings: MaterializerSettings, op: AstNode): Props = + (op match { + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) + }).withDispatcher(settings.dispatcher) + + def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { + val p = new ActorProcessor[I, O](impl) + impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) + p + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala new file mode 100644 index 0000000000..832bc9d17b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.language.higherKinds +import scala.collection.immutable +import scala.concurrent.Future +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import akka.stream.Transformer +import akka.stream.impl.BlackholeSubscriber +import akka.stream.impl2.Ast._ + +sealed trait Flow + +object FlowFrom { + /** + * Helper to create `Flow` without [[Input]]. + * Example usage: `FlowFrom[Int]` + */ + def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) + + /** + * Helper to create `Flow` with Input from `Iterable`. + * Example usage: `FlowFrom(Seq(1,2,3))` + */ + def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(IterableIn(i)) + + /** + * Helper to create `Flow` with [[Input]] from `Publisher`. + */ + def apply[T](p: Publisher[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(PublisherIn(p)) +} + +trait Input[-In] + +/** + * Default input. + * Allows to materialize a Flow with this input to Subscriber. + */ +final case class SubscriberIn[-In]() extends Input[In] { + def subscriber[I <: In]: Subscriber[I] = ??? +} + +/** + * Input from Publisher. + */ +final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] + +/** + * Input from Iterable + * + * Changing In from Contravariant to Covariant is needed because Iterable[+A]. + * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] + */ +final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] + +/** + * Input from Future + * + * Changing In from Contravariant to Covariant is needed because Future[+A]. + * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] + */ +final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] + +trait Output[+Out] + +/** + * Default output. + * Allows to materialize a Flow with this output to Publisher. + */ +final case class PublisherOut[+Out]() extends Output[Out] { + def publisher[O >: Out]: Publisher[O] = ??? +} + +final case class BlackholeOut[+Out]() extends Output[Out] { + def publisher[O >: Out]: Publisher[O] = ??? +} + +/** + * Output to a Subscriber. + */ +final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] + +/** + * Fold output. Reduces output stream according to the given fold function. + */ +final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { + def future: Future[T] = ??? +} + +/** + * Operations with a Flow which has open (no attached) Input. + * + * No Out type parameter would be useful for Graph signatures, but we need it here + * for `withInput` and `prependTransform` methods. + */ +sealed trait HasOpenInput[-In, +Out] extends Flow { + type Repr[-In, +Out] <: HasOpenInput[In, Out] + type AfterCloseInput[-In, +Out] <: Flow + + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] + + def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] + def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] + +} + +/** + * Operations with a Flow which has open (no attached) Output. + * + * No In type parameter would be useful for Graph signatures, but we need it here + * for `withOutput`. + */ +trait HasOpenOutput[-In, +Out] extends Flow { + type Repr[-In, +Out] <: HasOpenOutput[In, Out] + type AfterCloseOutput[-In, +Out] <: Flow + + // Storing ops in reverse order + protected def andThen[U](op: AstNode): Repr[In, U] + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] + + def map[T](f: Out ⇒ T): Repr[In, T] = + transform("map", () ⇒ new Transformer[Out, T] { + override def onNext(in: Out) = List(f(in)) + }) + + def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = { + andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) + } + + def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] + def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] +} + +/** + * Flow without attached input and without attached output, can be used as a `Processor`. + */ +final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { + override type Repr[-In, +Out] = ProcessorFlow[In, Out] + type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out] + type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out] + + override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, ops) + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, ops) + + override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = + ProcessorFlow(ops ::: f.ops) + override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + PublisherFlow(f.input, ops ::: f.ops) + + override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops) + override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + SubscriberFlow(f.output, f.ops ++: ops) +} + +/** + * Flow with attached output, can be used as a `Subscriber`. + */ +final case class SubscriberFlow[-In, +Out](output: Output[Out], ops: List[AstNode]) extends HasOpenInput[In, Out] { + type Repr[-In, +Out] = SubscriberFlow[In, Out] + type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out] + + def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, ops) + def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + + override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = + SubscriberFlow(output, ops ::: f.ops) + override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + RunnableFlow(f.input, output, ops ::: f.ops) +} + +/** + * Flow with attached input, can be used as a `Publisher`. + */ +final case class PublisherFlow[-In, +Out](input: Input[In], ops: List[AstNode]) extends HasOpenOutput[In, Out] { + override type Repr[-In, +Out] = PublisherFlow[In, Out] + type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out] + + override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) + + def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, ops) + def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + + override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = PublisherFlow(input, f.ops ++: ops) + override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + RunnableFlow(input, f.output, f.ops ++: ops) + +} + +/** + * Flow with attached input and output, can be executed. + */ +final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], ops: List[AstNode]) extends Flow { + def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, ops) + def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, ops) + + // FIXME + def run()(implicit materializer: FlowMaterializer): Unit = + produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) + + // FIXME replace with run and input/output factories + def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = + input match { + case PublisherIn(p) ⇒ materializer.toPublisher(ExistingPublisher(p), ops) + case IterableIn(iter) ⇒ materializer.toPublisher(IterablePublisherNode(iter), ops) + case _ ⇒ ??? + } + + def produceTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit = + toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]]) + +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala new file mode 100644 index 0000000000..8bb91e81d5 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorRefFactory +import akka.stream.impl2.ActorBasedFlowMaterializer +import akka.stream.impl2.Ast +import org.reactivestreams.{ Publisher, Subscriber } +import scala.concurrent.duration._ +import akka.actor.Deploy +import akka.actor.ExtendedActorSystem +import akka.actor.ActorContext +import akka.stream.impl2.StreamSupervisor +import akka.stream.impl2.FlowNameCounter +import akka.stream.MaterializerSettings + +object FlowMaterializer { + + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + + "got [${_contex.getClass.getName}]") + } + + new ActorBasedFlowMaterializer( + settings, + context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), + FlowNameCounter(system).counter, + namePrefix.getOrElse("flow")) + } + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + */ + def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = + apply(settings)(context) +} + +/** + * A FlowMaterializer takes the list of transformations comprising a + * [[akka.stream.scaladsl.Flow]] and materializes them in the form of + * [[org.reactivestreams.Processor]] instances. How transformation + * steps are split up into asynchronous regions is implementation + * dependent. + */ +abstract class FlowMaterializer(val settings: MaterializerSettings) { + + /** + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. + */ + def withNamePrefix(name: String): FlowMaterializer + + /** + * INTERNAL API + * ops are stored in reverse order + */ + private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O] + +} + diff --git a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala deleted file mode 100644 index 4e3f60bae8..0000000000 --- a/akka-stream/src/test/scala/akka/stream/dsl/CombinatorSpec.scala +++ /dev/null @@ -1,94 +0,0 @@ -package akka.stream.dsl - -import org.scalatest.Matchers -import org.scalatest.WordSpec -import scala.collection.immutable -import scala.concurrent.duration._ -import scala.concurrent.Future -import akka.stream.OverflowStrategy - -class CombinatorSpec extends WordSpec with Matchers { - val f = From[Int] - - "Linear simple combinators in Flow" should { - "map" in { - val t: OpenFlow[Int, Int] = f.map(_ * 2) - } - "mapFuture" in { - import scala.concurrent.ExecutionContext.Implicits.global - val t: OpenFlow[Int, Int] = f.mapFuture(Future(_)) - } - "filter" in { - val t: OpenFlow[Int, Int] = f.filter(_ != 2) - } - "collect" in { - val t: OpenFlow[Int, String] = f.collect { - case i: Int if i == 2 ⇒ "two" - } - } - "fold" in { - val fo = FoldOut("elements:") { (soFar, element: Int) ⇒ soFar + element } - val t: OpenInputFlow[Int, Int] = f.withOutput(fo) - } - "drop" in { - val t: OpenFlow[Int, Int] = f.drop(2) - } - "dropWithin" in { - val t: OpenFlow[Int, Int] = f.dropWithin(2.seconds) - } - "take" in { - val t: OpenFlow[Int, Int] = f.take(2) - } - "takeWithin" in { - val t: OpenFlow[Int, Int] = f.takeWithin(2.seconds) - } - "grouped" in { - val t: OpenFlow[Int, immutable.Seq[Int]] = f.grouped(2) - } - "groupedWithin" in { - val t: OpenFlow[Int, immutable.Seq[Int]] = f.groupedWithin(2, 2.seconds) - } - "mapConcat" in { - val t: OpenFlow[Int, Int] = f.mapConcat { i ⇒ immutable.Seq(i, i, i) } - } - "conflate" in { - val t: OpenFlow[Int, String] = f.conflate(_.toString, (soFar: String, i) ⇒ soFar + i) - } - "expand" in { - val t: OpenFlow[Int, String] = f.expand(_.toString, (soFar: String) ⇒ (soFar, "_")) - } - "buffer" in { - val t: OpenFlow[Int, Int] = f.buffer(100, OverflowStrategy.DropHead) - } - } - - "Linear combinators which produce multiple flows" should { - "prefixAndTail" in { - val t: OpenFlow[Int, (immutable.Seq[String], OpenOutputFlow[String, String])] = - f.map(_.toString).prefixAndTail(10) - } - "groupBy" in { - val grouped: OpenOutputFlow[Int, (String, OpenOutputFlow[Int, Int])] = - From(immutable.Seq(1, 2, 3)).map(_ * 2).groupBy((o: Int) ⇒ o.toString) - - val closedInner: OpenOutputFlow[Int, (String, ClosedFlow[Int, Int])] = grouped.map { - case (key, openFlow) ⇒ (key, openFlow.withOutput(PublisherOut())) - } - - // both of these compile, even if `grouped` has inner flows unclosed - grouped.withOutput(PublisherOut()).run - closedInner.withOutput(PublisherOut()).run - } - "splitWhen" in { - val t: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) - } - } - - "Linear combinators which consume multiple flows" should { - "flatten" in { - val split: OpenFlow[Int, OpenOutputFlow[String, String]] = f.map(_.toString).splitWhen(_.length > 2) - val flattened: OpenFlow[Int, String] = split.flatten(FlattenStrategy.concatOpenOutputFlow) - } - } - -} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala deleted file mode 100644 index fe6cee34dd..0000000000 --- a/akka-stream/src/test/scala/akka/stream/dsl/FlowSpec.scala +++ /dev/null @@ -1,141 +0,0 @@ -package akka.stream.dsl - -import org.scalatest.{ Matchers, WordSpec } - -import scala.collection.immutable.Seq -import scala.concurrent.Future - -class FlowSpec extends WordSpec with Matchers { - - val intSeq = IterableIn(Seq(1, 2, 3)) - val strSeq = IterableIn(Seq("a", "b", "c")) - - import scala.concurrent.ExecutionContext.Implicits.global - val intFut = FutureIn(Future { 3 }) - - "OpenFlow" should { - "go through all states" in { - val f: OpenFlow[Int, Int] = From[Int] - .withInput(intSeq) - .withOutput(PublisherOut()) - .withoutInput - .withoutOutput - } - "should not run" in { - val open: OpenFlow[Int, Int] = From[Int] - "open.run" shouldNot compile - } - "accept IterableIn" in { - val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intSeq) - } - "accept FutureIn" in { - val f: OpenOutputFlow[Int, Int] = From[Int].withInput(intFut) - } - "append OpenFlow" in { - val open1: OpenFlow[Int, String] = From[Int].map(_.toString) - val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) - val open3: OpenFlow[Int, Int] = open1.append(open2) - "open3.run" shouldNot compile - - val closedInput: OpenOutputFlow[Int, Int] = open3.withInput(intSeq) - "closedInput.run" shouldNot compile - - val closedOutput: OpenInputFlow[Int, Int] = open3.withOutput(PublisherOut()) - "closedOutput.run" shouldNot compile - - closedInput.withOutput(PublisherOut()).run - closedOutput.withInput(intSeq).run - } - "prepend OpenFlow" in { - val open1: OpenFlow[Int, String] = From[Int].map(_.toString) - val open2: OpenFlow[String, Int] = From[String].map(_.hashCode) - val open3: OpenFlow[String, String] = open1.prepend(open2) - "open3.run" shouldNot compile - - val closedInput: OpenOutputFlow[String, String] = open3.withInput(strSeq) - "closedInput.run" shouldNot compile - - val closedOutput: OpenInputFlow[String, String] = open3.withOutput(PublisherOut()) - "closedOutput.run" shouldNot compile - - closedInput.withOutput(PublisherOut()).run - closedOutput.withInput(strSeq).run - } - "append OpenInputFlow" in { - val open: OpenFlow[Int, String] = From[Int].map(_.toString) - val closedOutput: OpenInputFlow[String, Int] = From[String].map(_.hashCode).withOutput(PublisherOut()) - val appended: OpenInputFlow[Int, Int] = open.append(closedOutput) - "appended.run" shouldNot compile - "appended.toFuture" shouldNot compile - appended.withInput(intSeq).run - } - "prepend OpenOutputFlow" in { - val open: OpenFlow[Int, String] = From[Int].map(_.toString) - val closedInput: OpenOutputFlow[String, Int] = From[String].map(_.hashCode).withInput(strSeq) - val prepended: OpenOutputFlow[String, String] = open.prepend(closedInput) - "prepended.run" shouldNot compile - "prepended.withInput(strSeq)" shouldNot compile - prepended.withOutput(PublisherOut()).run - } - } - - "OpenInputFlow" should { - val openInput: OpenInputFlow[Int, String] = - From[Int].map(_.toString).withOutput(PublisherOut()) - "accept Input" in { - openInput.withInput(intSeq) - } - "drop Output" in { - openInput.withoutOutput - } - "not drop Input" in { - "openInput.withoutInput" shouldNot compile - } - "not accept Output" in { - "openInput.ToFuture" shouldNot compile - } - "not run" in { - "openInput.run" shouldNot compile - } - } - - "OpenOutputFlow" should { - val openOutput: OpenOutputFlow[Int, String] = - From(Seq(1, 2, 3)).map(_.toString) - "accept Output" in { - openOutput.withOutput(PublisherOut()) - } - "drop Input" in { - openOutput.withoutInput - } - "not drop Output" in { - "openOutput.withoutOutput" shouldNot compile - } - "not accept Input" in { - "openOutput.withInput(intSeq)" shouldNot compile - } - "not run" in { - "openOutput.run" shouldNot compile - } - } - - "ClosedFlow" should { - val closed: ClosedFlow[Int, String] = - From(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) - "run" in { - closed.run - } - "drop Input" in { - closed.withoutInput - } - "drop Output" in { - closed.withoutOutput - } - "not accept Input" in { - "closed.withInput(intSeq)" shouldNot compile - } - "not accept Output" in { - "closed.ToFuture" shouldNot compile - } - } -} diff --git a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala b/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala deleted file mode 100644 index 46647af26f..0000000000 --- a/akka-stream/src/test/scala/akka/stream/dsl/GraphSpec.scala +++ /dev/null @@ -1,57 +0,0 @@ -package akka.stream.dsl - -import org.scalatest.{ WordSpec, Matchers } - -import scala.collection.immutable.Seq - -class GraphSpec extends WordSpec with Matchers { - - "Graph" should { - "merge" in { - val merge = Merge[Int, Int, Int]() - - val in1 = From[Int].withOutput(merge.in1) - val in2 = From[Int].withOutput(merge.in2) - val out1 = From[Int].withInput(merge.out) - - val out2 = From[String] - // FIXME: make me not compile - //"out2.withInput(merge.out)" shouldNot compile - } - "zip" in { - val zip = Zip[Int, String]() - - val in1 = From[Int].withOutput(zip.in1) - val in2 = From[String].withOutput(zip.in2) - val out1 = From[(Int, String)].withInput(zip.out) - - val out2 = From[(String, Int)] - // FIXME: make me not compile - //"out2.withInput(zip.out)" shouldNot compile - } - "concat" in { - trait A - trait B extends A - - val concat = Concat[A, B, A]() - val in1 = From[A].withOutput(concat.in1) - val in2 = From[B].withOutput(concat.in2) - val out1 = From[A].withInput(concat.out) - - val out2 = From[String] - // FIXME: make me not compile - //"out2.withInput(concat.out)" shouldNot compile - } - "broadcast" in { - val broadcast = Broadcast[Int]() - - val in1 = From[Int].withOutput(broadcast.in) - val in2 = From[Int].withInput(broadcast.out1) - val out1 = From[Int].withInput(broadcast.out2) - - val out2 = From[String] - // FIXME: make me not compile - //"out2.withInput(broadcast.out2)" shouldNot compile - } - } -} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala new file mode 100644 index 0000000000..57440f2e66 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import org.scalatest.Matchers +import org.scalatest.WordSpec +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.Future +import akka.stream.OverflowStrategy + +class CombinatorSpec extends WordSpec with Matchers { + val f = FlowFrom[Int] + + "Linear simple combinators in Flow" should { + "map" in { + val t: ProcessorFlow[Int, Int] = f.map(_ * 2) + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala new file mode 100644 index 0000000000..e0da6678ab --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import org.scalatest.{ Matchers, WordSpec } +import scala.collection.immutable.Seq +import scala.concurrent.Future +import akka.stream.testkit.AkkaSpec +import akka.stream.MaterializerSettings + +class FlowSpec extends AkkaSpec { + + val intSeq = IterableIn(Seq(1, 2, 3)) + val strSeq = IterableIn(Seq("a", "b", "c")) + + import scala.concurrent.ExecutionContext.Implicits.global + val intFut = FutureIn(Future { 3 }) + implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + + "ProcessorFlow" should { + "go through all states" in { + val f: ProcessorFlow[Int, Int] = FlowFrom[Int] + .withInput(intSeq) + .withOutput(PublisherOut()) + .withoutInput + .withoutOutput + } + "should not run" in { + val open: ProcessorFlow[Int, Int] = FlowFrom[Int] + "open.run()" shouldNot compile + } + "accept IterableIn" in { + val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intSeq) + } + "accept FutureIn" in { + val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intFut) + } + "append ProcessorFlow" in { + val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val open2: ProcessorFlow[String, Int] = FlowFrom[String].map(_.hashCode) + val open3: ProcessorFlow[Int, Int] = open1.append(open2) + "open3.run()" shouldNot compile + + val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq) + "closedInput.run()" shouldNot compile + + val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut()) + "closedOutput.run()" shouldNot compile + + closedInput.withOutput(PublisherOut()).run() + closedOutput.withInput(intSeq).run() + } + "prepend ProcessorFlow" in { + val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val open2: ProcessorFlow[String, Int] = FlowFrom[String].map(_.hashCode) + val open3: ProcessorFlow[String, String] = open1.prepend(open2) + "open3.run()" shouldNot compile + + val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq) + "closedInput.run()" shouldNot compile + + val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut()) + "closedOutput.run()" shouldNot compile + + closedInput.withOutput(PublisherOut()).run + closedOutput.withInput(strSeq).run + } + "append SubscriberFlow" in { + val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val closedOutput: SubscriberFlow[String, Int] = FlowFrom[String].map(_.hashCode).withOutput(PublisherOut()) + val appended: SubscriberFlow[Int, Int] = open.append(closedOutput) + "appended.run()" shouldNot compile + "appended.toFuture" shouldNot compile + appended.withInput(intSeq).run + } + "prepend PublisherFlow" in { + val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) + val closedInput: PublisherFlow[String, Int] = FlowFrom[String].map(_.hashCode).withInput(strSeq) + val prepended: PublisherFlow[String, String] = open.prepend(closedInput) + "prepended.run()" shouldNot compile + "prepended.withInput(strSeq)" shouldNot compile + prepended.withOutput(PublisherOut()).run + } + } + + "SubscriberFlow" should { + val openInput: SubscriberFlow[Int, String] = + FlowFrom[Int].map(_.toString).withOutput(PublisherOut()) + "accept Input" in { + openInput.withInput(intSeq) + } + "drop Output" in { + openInput.withoutOutput + } + "not drop Input" in { + "openInput.withoutInput" shouldNot compile + } + "not accept Output" in { + "openInput.ToFuture" shouldNot compile + } + "not run()" in { + "openInput.run()" shouldNot compile + } + } + + "PublisherFlow" should { + val openOutput: PublisherFlow[Int, String] = + FlowFrom(Seq(1, 2, 3)).map(_.toString) + "accept Output" in { + openOutput.withOutput(PublisherOut()) + } + "drop Input" in { + openOutput.withoutInput + } + "not drop Output" in { + "openOutput.withoutOutput" shouldNot compile + } + "not accept Input" in { + "openOutput.withInput(intSeq)" shouldNot compile + } + "not run()" in { + "openOutput.run()" shouldNot compile + } + } + + "RunnableFlow" should { + val closed: RunnableFlow[Int, String] = + FlowFrom(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) + "run" in { + closed.run() + } + "drop Input" in { + closed.withoutInput + } + "drop Output" in { + closed.withoutOutput + } + "not accept Input" in { + "closed.withInput(intSeq)" shouldNot compile + } + "not accept Output" in { + "closed.ToFuture" shouldNot compile + } + } +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala new file mode 100644 index 0000000000..6c32bda6bc --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -0,0 +1,403 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.testkit.{ EventFilter, TestProbe } +import com.typesafe.config.ConfigFactory +import scala.collection.immutable.Seq +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import akka.stream.Transformer +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { + + implicit val materializer = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) + + "A Flow with transform operations" must { + "produce one-to-one transformation as expected" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(elem: Int) = { + tot += elem + List(tot) + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(1) + subscriber.expectNext(1) + subscriber.expectNoMsg(200.millis) + subscription.request(2) + subscriber.expectNext(3) + subscriber.expectNext(6) + subscriber.expectComplete() + } + + "produce one-to-several transformation as expected" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(elem: Int) = { + tot += elem + Vector.fill(elem)(tot) + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(4) + subscriber.expectNext(1) + subscriber.expectNext(3) + subscriber.expectNext(3) + subscriber.expectNext(6) + subscriber.expectNoMsg(200.millis) + subscription.request(100) + subscriber.expectNext(6) + subscriber.expectNext(6) + subscriber.expectComplete() + } + + "produce dropping transformation as expected" in { + val p = FlowFrom(List(1, 2, 3, 4)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(elem: Int) = { + tot += elem + if (elem % 2 == 0) { + Nil + } else { + List(tot) + } + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(1) + subscriber.expectNext(1) + subscriber.expectNoMsg(200.millis) + subscription.request(1) + subscriber.expectNext(6) + subscription.request(1) + subscriber.expectComplete() + } + + "produce multi-step transformation as expected" in { + val p = FlowFrom(List("a", "bc", "def")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, Int] { + var concat = "" + override def onNext(elem: String) = { + concat += elem + List(concat.length) + } + }). + transform("transform", () ⇒ new Transformer[Int, Int] { + var tot = 0 + override def onNext(length: Int) = { + tot += length + List(tot) + } + }). + withOutput(PublisherOut()).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c1) + val sub1 = c1.expectSubscription() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c2) + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(4) + c1.expectNoMsg(200.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext(4) + c1.expectNext(10) + c2.expectNext(10) + c1.expectComplete() + c2.expectComplete() + } + + "invoke onComplete when done" in { + val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "" + override def onNext(element: String) = { + s += element + Nil + } + override def onTermination(e: Option[Throwable]) = List(s + "B") + }). + withOutput(PublisherOut()).toPublisher() + val c = StreamTestKit.SubscriberProbe[String]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(1) + c.expectNext("aB") + c.expectComplete() + } + + "invoke cleanup when done" in { + val cleanupProbe = TestProbe() + val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "" + override def onNext(element: String) = { + s += element + Nil + } + override def onTermination(e: Option[Throwable]) = List(s + "B") + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(PublisherOut()).toPublisher() + val c = StreamTestKit.SubscriberProbe[String]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(1) + c.expectNext("aB") + c.expectComplete() + cleanupProbe.expectMsg("a") + } + + "invoke cleanup when done consume" in { + val cleanupProbe = TestProbe() + val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "x" + override def onNext(element: String) = { + s = element + List(element) + } + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(BlackholeOut()).run() + cleanupProbe.expectMsg("a") + } + + "invoke cleanup when done after error" in { + val cleanupProbe = TestProbe() + val p = FlowFrom(List("a", "b", "c")).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[String, String] { + var s = "" + override def onNext(in: String) = { + if (in == "b") { + throw new IllegalArgumentException("Not b") with NoStackTrace + } else { + val out = s + in + s += in.toUpperCase + List(out) + } + } + override def onTermination(e: Option[Throwable]) = List(s + "B") + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(PublisherOut()).toPublisher() + val c = StreamTestKit.SubscriberProbe[String]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(1) + c.expectNext("a") + s.request(1) + c.expectError() + cleanupProbe.expectMsg("A") + } + + "allow cancellation using isComplete" in { + val p = StreamTestKit.PublisherProbe[Int]() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var s = "" + override def onNext(element: Int) = { + s += element + List(element) + } + override def isComplete = s == "1" + }). + withOutput(PublisherOut()).toPublisher() + val proc = p.expectSubscription + val c = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(10) + proc.sendNext(1) + proc.sendNext(2) + c.expectNext(1) + c.expectComplete() + proc.expectCancellation() + } + + "call onComplete after isComplete signaled completion" in { + val cleanupProbe = TestProbe() + val p = StreamTestKit.PublisherProbe[Int]() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + var s = "" + override def onNext(element: Int) = { + s += element + List(element) + } + override def isComplete = s == "1" + override def onTermination(e: Option[Throwable]) = List(s.length + 10) + override def cleanup() = cleanupProbe.ref ! s + }). + withOutput(PublisherOut()).toPublisher() + val proc = p.expectSubscription + val c = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(c) + val s = c.expectSubscription() + s.request(10) + proc.sendNext(1) + proc.sendNext(2) + c.expectNext(1) + c.expectNext(11) + c.expectComplete() + proc.expectCancellation() + cleanupProbe.expectMsg("1") + } + + "report error when exception is thrown" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + override def onNext(elem: Int) = { + if (elem == 2) { + throw new IllegalArgumentException("two not allowed") + } else { + List(elem, elem) + } + } + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + EventFilter[IllegalArgumentException]("two not allowed") intercept { + subscription.request(100) + subscriber.expectNext(1) + subscriber.expectNext(1) + subscriber.expectError().getMessage should be("two not allowed") + subscriber.expectNoMsg(200.millis) + } + } + + "support cancel as expected" in { + val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + override def onNext(elem: Int) = List(elem, elem) + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(2) + subscriber.expectNext(1) + subscription.cancel() + subscriber.expectNext(1) + subscriber.expectNoMsg(500.millis) + subscription.request(2) + subscriber.expectNoMsg(200.millis) + } + + "support producing elements from empty inputs" in { + val p = FlowFrom(List.empty[Int]).withOutput(PublisherOut()).toPublisher() + val p2 = FlowFrom(p). + transform("transform", () ⇒ new Transformer[Int, Int] { + override def onNext(elem: Int) = Nil + override def onTermination(e: Option[Throwable]) = List(1, 2, 3) + }). + withOutput(PublisherOut()).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + p2.subscribe(subscriber) + val subscription = subscriber.expectSubscription() + subscription.request(4) + subscriber.expectNext(1) + subscriber.expectNext(2) + subscriber.expectNext(3) + subscriber.expectComplete() + + } + + "support converting onComplete into onError" in { + val subscriber = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(List(5, 1, 2, 3)).transform("transform", () ⇒ new Transformer[Int, Int] { + var expectedNumberOfElements: Option[Int] = None + var count = 0 + override def onNext(elem: Int) = + if (expectedNumberOfElements.isEmpty) { + expectedNumberOfElements = Some(elem) + Nil + } else { + count += 1 + List(elem) + } + override def onTermination(err: Option[Throwable]) = err match { + case Some(e) ⇒ Nil + case None ⇒ + expectedNumberOfElements match { + case Some(expected) if count != expected ⇒ + throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace + case _ ⇒ Nil + } + } + }).withOutput(PublisherOut()).produceTo(subscriber) + + val subscription = subscriber.expectSubscription() + subscription.request(10) + + subscriber.expectNext(1) + subscriber.expectNext(2) + subscriber.expectNext(3) + subscriber.expectError().getMessage should be("Expected 5, got 3") + } + + "be safe to reuse" in { + val flow = FlowFrom(1 to 3).transform("transform", () ⇒ + new Transformer[Int, Int] { + var count = 0 + + override def onNext(elem: Int): Seq[Int] = { + count += 1 + List(count) + } + }).withOutput(PublisherOut()) + + val s1 = StreamTestKit.SubscriberProbe[Int]() + flow.produceTo(s1) + s1.expectSubscription().request(3) + s1.expectNext(1, 2, 3) + s1.expectComplete() + + val s2 = StreamTestKit.SubscriberProbe[Int]() + flow.produceTo(s2) + s2.expectSubscription().request(3) + s2.expectNext(1, 2, 3) + s2.expectComplete() + } + } + +}