diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 0a7ba084f4..c1e2b94af0 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -9,6 +9,7 @@ import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.impl.Ast import org.reactivestreams.api.Producer import scala.concurrent.duration._ +import org.reactivestreams.api.Consumer object FlowMaterializer { /** @@ -40,6 +41,21 @@ trait FlowMaterializer { */ private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit + /** + * INTERNAL API + */ + private[akka] def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] + + /** + * INTERNAL API + */ + private[akka] def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In] + + /** + * INTERNAL API + */ + private[akka] def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 3446337628..69e98f353a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -115,15 +115,20 @@ private[akka] class ActorBasedFlowMaterializer(settings: MaterializerSettings, _ } } - private val identityConsumer = Transform( + private val blackholeTransform = Transform( new Transformer[Any, Any] { override def onNext(element: Any) = Nil }) + private val identityTransform = Transform( + new Transformer[Any, Any] { + override def onNext(element: Any) = List(element) + }) + override def consume[I](producerNode: ProducerNode[I], ops: List[AstNode]): Unit = { val consumer = ops match { case Nil ⇒ - new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, identityConsumer))) + new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, blackholeTransform))) case head :: tail ⇒ val c = new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, head))) processorChain(c, tail) @@ -133,4 +138,21 @@ private[akka] class ActorBasedFlowMaterializer(settings: MaterializerSettings, _ def processorForNode(op: AstNode): Processor[Any, Any] = new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op))) + override def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] = + processorChain(consumer, ops).asInstanceOf[Consumer[In]] + + override def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In] = + ductProduceTo(new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, blackholeTransform))), ops) + + override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) = { + if (ops.isEmpty) { + val identityProcessor: Processor[In, Out] = processorForNode(identityTransform).asInstanceOf[Processor[In, Out]] + (identityProcessor, identityProcessor) + } else { + val outProcessor = processorForNode(ops.head).asInstanceOf[Processor[In, Out]] + val topConsumer = processorChain(outProcessor, ops.tail).asInstanceOf[Processor[In, Out]] + (topConsumer, outProcessor) + } + } + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index b40e9c356a..4eec112bc3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -16,138 +16,19 @@ import scala.util.Failure import akka.stream.scaladsl.Transformer import akka.stream.scaladsl.RecoveryTransformer import org.reactivestreams.api.Consumer +import akka.stream.scaladsl.Duct /** * INTERNAL API */ -private[akka] object FlowImpl { - private val SuccessUnit = Success[Unit](()) - private val ListOfUnit = List(()) - - val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { - override def onNext(elem: Any) = Nil - override def isComplete = true - } - - val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { - override def onNext(elem: Any) = List(elem) - } -} - -/** - * INTERNAL API - */ -private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] { +private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] with Builder[O] { import FlowImpl._ import Ast._ + + type Thing[T] = Flow[T] + // Storing ops in reverse order - private def andThen[U](op: AstNode): Flow[U] = this.copy(ops = op :: ops) - - override def map[U](f: O ⇒ U): Flow[U] = - transform(new Transformer[O, U] { - override def onNext(in: O) = List(f(in)) - }) - - override def filter(p: O ⇒ Boolean): Flow[O] = - transform(new Transformer[O, O] { - override def onNext(in: O) = if (p(in)) List(in) else Nil - }) - - override def collect[U](pf: PartialFunction[O, U]): Flow[U] = - transform(new Transformer[O, U] { - override def onNext(in: O) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil - }) - - override def foreach(c: O ⇒ Unit): Flow[Unit] = - transform(new Transformer[O, Unit] { - override def onNext(in: O) = { c(in); Nil } - override def onComplete() = ListOfUnit - }) - - override def fold[U](zero: U)(f: (U, O) ⇒ U): Flow[U] = - transform(new FoldTransformer[U](zero, f)) - - // Without this class compiler complains about - // "Parameter type in structural refinement may not refer to an abstract type defined outside that refinement" - class FoldTransformer[S](var state: S, f: (S, O) ⇒ S) extends Transformer[O, S] { - override def onNext(in: O): immutable.Seq[S] = { state = f(state, in); Nil } - override def onComplete(): immutable.Seq[S] = List(state) - } - - override def drop(n: Int): Flow[O] = - transform(new Transformer[O, O] { - var delegate: Transformer[O, O] = - if (n == 0) identityTransformer.asInstanceOf[Transformer[O, O]] - else new Transformer[O, O] { - var c = n - override def onNext(in: O) = { - c -= 1 - if (c == 0) - delegate = identityTransformer.asInstanceOf[Transformer[O, O]] - Nil - } - } - - override def onNext(in: O) = delegate.onNext(in) - }) - - override def take(n: Int): Flow[O] = - transform(new Transformer[O, O] { - var delegate: Transformer[O, O] = - if (n == 0) takeCompletedTransformer.asInstanceOf[Transformer[O, O]] - else new Transformer[O, O] { - var c = n - override def onNext(in: O) = { - c -= 1 - if (c == 0) - delegate = takeCompletedTransformer.asInstanceOf[Transformer[O, O]] - List(in) - } - - override def isComplete = c == 0 - } - - override def onNext(in: O) = delegate.onNext(in) - override def isComplete = delegate.isComplete - }) - - override def grouped(n: Int): Flow[immutable.Seq[O]] = - transform(new Transformer[O, immutable.Seq[O]] { - var buf: Vector[O] = Vector.empty - override def onNext(in: O) = { - buf :+= in - if (buf.size == n) { - val group = buf - buf = Vector.empty - List(group) - } else - Nil - } - override def onComplete() = if (buf.isEmpty) Nil else List(buf) - }) - - override def mapConcat[U](f: O ⇒ immutable.Seq[U]): Flow[U] = - transform(new Transformer[O, U] { - override def onNext(in: O) = f(in) - }) - - override def transform[U](transformer: Transformer[O, U]): Flow[U] = - andThen(Transform(transformer.asInstanceOf[Transformer[Any, Any]])) - - override def transformRecover[U](recoveryTransformer: RecoveryTransformer[O, U]): Flow[U] = - andThen(Recover(recoveryTransformer.asInstanceOf[RecoveryTransformer[Any, Any]])) - - override def zip[O2](other: Producer[O2]): Flow[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) - - override def concat[U >: O](next: Producer[U]): Flow[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) - - override def merge[U >: O](other: Producer[U]): Flow[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) - - override def splitWhen(p: (O) ⇒ Boolean): Flow[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) - - override def groupBy[K](f: (O) ⇒ K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) - - override def tee(other: Consumer[_ >: O]): Flow[O] = andThen(Tee(other.asInstanceOf[Consumer[Any]])) + override protected def andThen[U](op: Ast.AstNode): Flow[U] = this.copy(ops = op :: ops) override def toFuture(materializer: FlowMaterializer): Future[O] = { val p = Promise[O]() @@ -163,7 +44,7 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def consume(materializer: FlowMaterializer): Unit = materializer.consume(producerNode, ops) - def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit = + override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit = transformRecover(new RecoveryTransformer[O, Unit] { var ok = true override def onNext(in: O) = Nil @@ -172,12 +53,181 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: ok = false Nil } - override def onComplete() = { if (ok) callback(SuccessUnit); Nil } + override def onComplete() = { if (ok) callback(Builder.SuccessUnit); Nil } }).consume(materializer) override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops) - override def produceTo(materializer: FlowMaterializer, consumer: Consumer[O]) = - toProducer(materializer).produceTo(consumer) + override def produceTo(materializer: FlowMaterializer, consumer: Consumer[_ >: O]) = + toProducer(materializer).produceTo(consumer.asInstanceOf[Consumer[O]]) +} + +/** + * INTERNAL API + */ +private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[In, Out] with Builder[Out] { + + type Thing[T] = Duct[In, T] + + // Storing ops in reverse order + override protected def andThen[U](op: Ast.AstNode): Duct[In, U] = this.copy(ops = op :: ops) + + override def produceTo(materializer: FlowMaterializer, consumer: Consumer[Out]): Consumer[In] = + materializer.ductProduceTo(consumer, ops) + + override def consume(materializer: FlowMaterializer): Consumer[In] = + materializer.ductConsume(ops) + + override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Consumer[In] = + transformRecover(new RecoveryTransformer[Out, Unit] { + var ok = true + override def onNext(in: Out) = Nil + override def onError(e: Throwable) = { + callback(Failure(e)) + ok = false + Nil + } + override def onComplete() = { if (ok) callback(Builder.SuccessUnit); Nil } + }).consume(materializer) + + override def build(materializer: FlowMaterializer): (Consumer[In], Producer[Out]) = + materializer.ductBuild(ops) + +} + +/** + * INTERNAL API + */ +private[akka] object Builder { + val SuccessUnit = Success[Unit](()) + private val ListOfUnit = List(()) + + private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + override def onNext(elem: Any) = Nil + override def isComplete = true + } + + private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + override def onNext(elem: Any) = List(elem) + } +} + +/** + * INTERNAL API + * Builder of `Flow` or `Duct` things + */ +private[akka] trait Builder[Out] { + import Builder._ + import akka.stream.impl.Ast._ + import scala.language.higherKinds + + type Thing[T] + + protected def andThen[U](op: Ast.AstNode): Thing[U] + + def map[U](f: Out ⇒ U): Thing[U] = + transform(new Transformer[Out, U] { + override def onNext(in: Out) = List(f(in)) + }) + + def filter(p: Out ⇒ Boolean): Thing[Out] = + transform(new Transformer[Out, Out] { + override def onNext(in: Out) = if (p(in)) List(in) else Nil + }) + + def collect[U](pf: PartialFunction[Out, U]): Thing[U] = + transform(new Transformer[Out, U] { + override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil + }) + + def foreach(c: Out ⇒ Unit): Thing[Unit] = + transform(new Transformer[Out, Unit] { + override def onNext(in: Out) = { c(in); Nil } + override def onComplete() = ListOfUnit + }) + + def fold[U](zero: U)(f: (U, Out) ⇒ U): Thing[U] = + transform(new FoldTransformer[U](zero, f)) + + // Without this class compiler complains about + // "Parameter type in structural refinement may not refer to an abstract type defined outside that refinement" + class FoldTransformer[S](var state: S, f: (S, Out) ⇒ S) extends Transformer[Out, S] { + override def onNext(in: Out): immutable.Seq[S] = { state = f(state, in); Nil } + override def onComplete(): immutable.Seq[S] = List(state) + } + + def drop(n: Int): Thing[Out] = + transform(new Transformer[Out, Out] { + var delegate: Transformer[Out, Out] = + if (n == 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] + else new Transformer[Out, Out] { + var c = n + override def onNext(in: Out) = { + c -= 1 + if (c == 0) + delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + } + + override def onNext(in: Out) = delegate.onNext(in) + }) + + def take(n: Int): Thing[Out] = + transform(new Transformer[Out, Out] { + var delegate: Transformer[Out, Out] = + if (n == 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + else new Transformer[Out, Out] { + var c = n + override def onNext(in: Out) = { + c -= 1 + if (c == 0) + delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + List(in) + } + } + + override def onNext(in: Out) = delegate.onNext(in) + override def isComplete = delegate.isComplete + }) + + def grouped(n: Int): Thing[immutable.Seq[Out]] = + transform(new Transformer[Out, immutable.Seq[Out]] { + var buf: Vector[Out] = Vector.empty + override def onNext(in: Out) = { + buf :+= in + if (buf.size == n) { + val group = buf + buf = Vector.empty + List(group) + } else + Nil + } + override def onComplete() = if (buf.isEmpty) Nil else List(buf) + }) + + def mapConcat[U](f: Out ⇒ immutable.Seq[U]): Thing[U] = + transform(new Transformer[Out, U] { + override def onNext(in: Out) = f(in) + }) + + def transform[U](transformer: Transformer[Out, U]): Thing[U] = + andThen(Transform(transformer.asInstanceOf[Transformer[Any, Any]])) + + def transformRecover[U](recoveryTransformer: RecoveryTransformer[Out, U]): Thing[U] = + andThen(Recover(recoveryTransformer.asInstanceOf[RecoveryTransformer[Any, Any]])) + + def zip[O2](other: Producer[O2]): Thing[(Out, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) + + def concat[U >: Out](next: Producer[U]): Thing[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) + + def merge[U >: Out](other: Producer[U]): Thing[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) + + def splitWhen(p: (Out) ⇒ Boolean): Thing[Producer[Out]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) + + def groupBy[K](f: (Out) ⇒ K): Thing[(K, Producer[Out])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + + def tee(other: Consumer[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Consumer[Any]])) + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala new file mode 100644 index 0000000000..30b76c1e46 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -0,0 +1,234 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import scala.collection.immutable +import akka.stream.impl.DuctImpl +import org.reactivestreams.api.Consumer +import akka.stream.FlowMaterializer +import scala.annotation.unchecked.uncheckedVariance +import org.reactivestreams.api.Producer +import scala.util.Try + +object Duct { + + private val empty = DuctImpl[Any, Any](Nil) + + /** + * Create an empty [[Duct]]. The transformation steps are executed by a series + * of [[org.reactivestreams.api.Processor]] instances that mediate the flow of + * elements downstream and the propagation of back-pressure upstream. + */ + def apply[In]: Duct[In, In] = empty.asInstanceOf[Duct[In, In]] + +} + +/** + * A `Duct` provides the same kind of formulation of stream transformations as a [[Flow]]. + * The difference is that it is not attached to an input source. + * + * The pipeline must be materialized by calling the [[#produceTo]], [[#consume]] or [[#build]] + * methods on it and then attach the `Consumer` representing the input side of the `Duct` to an + * upstream `Producer`. + * + */ +trait Duct[In, +Out] { + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. + */ + def map[U](f: Out ⇒ U): Duct[In, U] + + /** + * Only pass on those elements that satisfy the given predicate. + */ + def filter(p: Out ⇒ Boolean): Duct[In, Out] + + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + */ + def collect[U](pf: PartialFunction[Out, U]): Duct[In, U] + + /** + * Invoke the given procedure for each received element and produce a Unit value + * upon reaching the normal end of the stream. Please note that also in this case + * the `Duct` needs to be materialized (e.g. using [[#consume]] and attaching the + * the `Consumer` representing the input side of the `Duct` to an upstream + * `Producer`) to initiate its execution. + */ + def foreach(c: Out ⇒ Unit): Duct[In, Unit] + + /** + * Invoke the given function for every received element, giving it its previous + * output (or the given “zero” value) and the element as input. The returned stream + * will receive the return value of the final function evaluation when the input + * stream ends. + */ + def fold[U](zero: U)(f: (U, Out) ⇒ U): Duct[In, U] + + /** + * Discard the given number of elements at the beginning of the stream. + */ + def drop(n: Int): Duct[In, Out] + + /** + * Terminate processing (and cancel the upstream producer) after the given + * number of elements. Due to input buffering some elements may have been + * requested from upstream producers that will then not be processed downstream + * of this step. + */ + def take(n: Int): Duct[In, Out] + + /** + * Chunk up this stream into groups of the given size, with the last group + * possibly smaller than requested due to end-of-stream. + */ + def grouped(n: Int): Duct[In, immutable.Seq[Out]] + + /** + * Transform each input element into a sequence of output elements that is + * then flattened into the output stream. + */ + def mapConcat[U](f: Out ⇒ immutable.Seq[U]): Duct[In, U] + + /** + * Generic transformation of a stream: for each element the [[Transformer#onNext]] + * function is invoked and expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * consumers, the [[Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream consumers, + * the [[Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * After normal completion or error the [[Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[Transformer]] instance with + * ordinary instance variables. The [[Transformer]] is executed by an actor and + * therefore you don not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + */ + def transform[U](transformer: Transformer[Out, U]): Duct[In, U] + + /** + * This transformation stage works exactly like [[#transform]] with the + * change that failure signaled from upstream will invoke + * [[RecoveryTransformer#onError]], which can emit an additional sequence of + * elements before the stream ends. + * + * After normal completion or error the [[RecoveryTransformer#cleanup]] function + * is called. + */ + def transformRecover[U](recoveryTransformer: RecoveryTransformer[Out, U]): Duct[In, U] + + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * it is emitted to the downstream consumer together with a fresh + * producer that will eventually produce all the elements of the substream + * for that key. Not consuming the elements from the created streams will + * stop this processor from processing more elements, therefore you must take + * care to unblock (or cancel) all of the produced streams even if you want + * to consume only one of them. + */ + def groupBy[K](f: Out ⇒ K): Duct[In, (K, Producer[Out @uncheckedVariance])] + + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. This means + * that for the following series of predicate values, three substreams will + * be produced with lengths 1, 2, and 3: + * + * {{{ + * false, // element goes into first substream + * true, false, // elements go into second substream + * true, false, false // elements go into third substream + * }}} + */ + def splitWhen(p: Out ⇒ Boolean): Duct[In, Producer[Out @uncheckedVariance]] + + /** + * Merge this stream with the one emitted by the given producer, taking + * elements as they arrive from either side (picking randomly when both + * have elements ready). + */ + def merge[U >: Out](other: Producer[U]): Duct[In, U] + + /** + * Zip this stream together with the one emitted by the given producer. + * This transformation finishes when either input stream reaches its end, + * cancelling the subscription to the other one. + */ + def zip[U](other: Producer[U]): Duct[In, (Out, U)] + + /** + * Concatenate the given other stream to this stream so that the first element + * emitted by the given producer is emitted after the last element of this + * stream. + */ + def concat[U >: Out](next: Producer[U]): Duct[In, U] + + /** + * Fan-out the stream to another consumer. Each element is produced to + * the `other` consumer as well as to downstream consumers. It will + * not shutdown until the subscriptions for `other` and at least + * one downstream consumer have been established. + */ + def tee(other: Consumer[_ >: Out]): Duct[In, Out] + + /** + * Materialize this `Duct` by attaching it to the specified downstream `consumer` + * and return a `Consumer` representing the input side of the `Duct`. + * The returned `Consumer` can later be connected to an upstream `Producer`. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def produceTo(materializer: FlowMaterializer, consumer: Consumer[Out] @uncheckedVariance): Consumer[In] + + /** + * Attaches a consumer to this stream which will just discard all received + * elements. The returned `Consumer` represents the input side of the `Duct` and can + * later be connected to an upstream `Producer`. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def consume(materializer: FlowMaterializer): Consumer[In] + + /** + * When this flow is completed, either through an error or normal + * completion, apply the provided function with [[scala.util.Success]] + * or [[scala.util.Failure]]. The returned `Consumer` represents the input side of + * the `Duct` and can later be connected to an upstream `Producer`. + * + * *This operation materializes the flow and initiates its execution.* + */ + def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Consumer[In] + + /** + * Materialize this `Duct` into a `Consumer` representing the input side of the `Duct` + * and a `Producer`representing the output side of the the `Duct`. + * + * The returned `Producer` can later be connected to an downstream `Consumer`. + * The returned `Consumer` can later be connected to an upstream `Producer`. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def build(materializer: FlowMaterializer): (Consumer[In], Producer[Out] @uncheckedVariance) + +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3c56e463e2..b0462c98cd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -111,7 +111,7 @@ trait Flow[+T] { /** * Invoke the given procedure for each received element and produce a Unit value * upon reaching the normal end of the stream. Please note that also in this case - * the flow needs to be materialized (e.g. using [[#consume]]) to initiate its + * the `Flow` needs to be materialized (e.g. using [[#consume]]) to initiate its * execution. */ def foreach(c: T ⇒ Unit): Flow[Unit] @@ -288,7 +288,7 @@ trait Flow[+T] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def produceTo(materializer: FlowMaterializer, consumer: Consumer[T @uncheckedVariance]): Unit + def produceTo(materializer: FlowMaterializer, consumer: Consumer[_ >: T]): Unit } diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala new file mode 100644 index 0000000000..91f4bc7561 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import org.reactivestreams.api.Consumer +import org.reactivestreams.api.Producer +import akka.stream.scaladsl.Duct +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import scala.util.Success +import scala.util.Failure + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class DuctSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings()) + + "A Duct" must { + + "materialize into Producer/Consumer" in { + val duct: Duct[String, String] = Duct[String] + val (ductIn: Consumer[String], ductOut: Producer[String]) = duct.build(materializer) + + val c1 = StreamTestKit.consumerProbe[String] + ductOut.produceTo(c1) + + val source: Producer[String] = Flow(List("1", "2", "3")).toProducer(materializer) + source.produceTo(ductIn) + + val sub1 = c1.expectSubscription + sub1.requestMore(3) + c1.expectNext("1") + c1.expectNext("2") + c1.expectNext("3") + c1.expectComplete + } + + "materialize into Producer/Consumer and transformation processor" in { + val duct: Duct[Int, String] = Duct[Int].map((i: Int) ⇒ i.toString) + val (ductIn: Consumer[Int], ductOut: Producer[String]) = duct.build(materializer) + + val c1 = StreamTestKit.consumerProbe[String] + ductOut.produceTo(c1) + val sub1 = c1.expectSubscription + sub1.requestMore(3) + c1.expectNoMsg(200.millis) + + val source: Producer[Int] = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(ductIn) + + c1.expectNext("1") + c1.expectNext("2") + c1.expectNext("3") + c1.expectComplete + } + + "materialize into Producer/Consumer and multiple transformation processors" in { + val duct = Duct[Int].map(_.toString).map("elem-" + _) + val (ductIn, ductOut) = duct.build(materializer) + + val c1 = StreamTestKit.consumerProbe[String] + ductOut.produceTo(c1) + val sub1 = c1.expectSubscription + sub1.requestMore(3) + c1.expectNoMsg(200.millis) + + val source: Producer[Int] = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(ductIn) + + c1.expectNext("elem-1") + c1.expectNext("elem-2") + c1.expectNext("elem-3") + c1.expectComplete + } + + "produceTo Consumer" in { + val duct: Duct[String, String] = Duct[String] + val c1 = StreamTestKit.consumerProbe[String] + val c2: Consumer[String] = duct.produceTo(materializer, c1) + val source: Producer[String] = Flow(List("1", "2", "3")).toProducer(materializer) + source.produceTo(c2) + + val sub1 = c1.expectSubscription + sub1.requestMore(3) + c1.expectNext("1") + c1.expectNext("2") + c1.expectNext("3") + c1.expectComplete + } + + "perform transformation operation" in { + val duct = Duct[Int].map(i ⇒ { testActor ! i.toString; i.toString }) + val c = duct.consume(materializer) + + val source = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(c) + + expectMsg("1") + expectMsg("2") + expectMsg("3") + } + + "perform multiple transformation operations" in { + val duct = Duct[Int].map(_.toString).map("elem-" + _).foreach(testActor ! _) + val c = duct.consume(materializer) + + val source = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(c) + + expectMsg("elem-1") + expectMsg("elem-2") + expectMsg("elem-3") + } + + "perform transformation operation and produceTo Consumer" in { + val duct = Duct[Int].map(_.toString) + val c1 = StreamTestKit.consumerProbe[String] + val c2: Consumer[Int] = duct.produceTo(materializer, c1) + + val sub1 = c1.expectSubscription + sub1.requestMore(3) + c1.expectNoMsg(200.millis) + + val source: Producer[Int] = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(c2) + + c1.expectNext("1") + c1.expectNext("2") + c1.expectNext("3") + c1.expectComplete + } + + "perform multiple transformation operations and produceTo Consumer" in { + val duct = Duct[Int].map(_.toString).map("elem-" + _) + val c1 = StreamTestKit.consumerProbe[String] + val c2 = duct.produceTo(materializer, c1) + + val sub1 = c1.expectSubscription + sub1.requestMore(3) + c1.expectNoMsg(200.millis) + + val source: Producer[Int] = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(c2) + + c1.expectNext("elem-1") + c1.expectNext("elem-2") + c1.expectNext("elem-3") + c1.expectComplete + } + + "call onComplete callback when done" in { + val duct = Duct[Int].map(i ⇒ { testActor ! i.toString; i.toString }) + val c = duct.onComplete(materializer) { + case Success(_) ⇒ testActor ! "DONE" + case Failure(e) ⇒ testActor ! e + } + + val source = Flow(List(1, 2, 3)).toProducer(materializer) + source.produceTo(c) + + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectMsg("DONE") + } + + } + +}