diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala index 778313436c..d0be176fe9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala @@ -30,7 +30,7 @@ trait Drain[-In] extends Sink[In] /** * A drain that does not need to create a user-accessible object during materialization. */ -trait SimpleDrain[-In] extends Drain[In] with DrainOps[In] { +trait SimpleDrain[-In] extends Drain[In] { /** * Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given * [[FlowMaterializer]] is completely optional, especially if this drain belongs to @@ -63,7 +63,7 @@ trait SimpleDrain[-In] extends Drain[In] with DrainOps[In] { * to retrieve in order to access aspects of this drain (could be a completion Future * or a cancellation handle, etc.) */ -trait DrainWithKey[-In, T] extends DrainOps[In] { +trait DrainWithKey[-In, T] extends Drain[In] { /** * Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given * [[FlowMaterializer]] is completely optional, especially if this drain belongs to @@ -269,8 +269,3 @@ trait MaterializedDrain { */ def getDrainFor[T](drainKey: DrainWithKey[_, T]): T } - -trait DrainOps[-In] extends Drain[In] { - override def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = - Flow[In].connect(this).toSubscriber() -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 341cdb0336..4e4ee3f145 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -3,9 +3,11 @@ */ package akka.stream.scaladsl2 +import akka.stream.impl2.Ast._ import akka.stream.{ TimerTransformer, Transformer, OverflowStrategy } +import akka.util.Collections.EmptyImmutableSeq import scala.collection.immutable -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.Future import scala.language.higherKinds @@ -35,23 +37,44 @@ object Flow { def apply[T]: Flow[T, T] = Pipe.empty[T] } +/** + * Flow with attached input and output, can be executed. + */ +trait RunnableFlow { + def run()(implicit materializer: FlowMaterializer): MaterializedFlow +} + +/** + * Returned by [[RunnableFlow#run]] and can be used as parameter to the + * accessor method to retrieve the materialized `Tap` or `Drain`, e.g. + * [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]]. + */ +trait MaterializedFlow extends MaterializedTap with MaterializedDrain + /** * Scala API: Operations offered by Flows and Sources with a free output side: the DSL flows left-to-right only. */ trait FlowOps[+Out] { + import FlowOps._ type Repr[+O] /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. */ - def map[T](f: Out ⇒ T): Repr[T] + def map[T](f: Out ⇒ T): Repr[T] = + transform("map", () ⇒ new Transformer[Out, T] { + def onNext(in: Out) = List(f(in)) + }) /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. */ - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] + def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = + transform("mapConcat", () ⇒ new Transformer[Out, T] { + def onNext(in: Out) = f(in) + }) /** * Transform this stream by applying the given function to each of the elements @@ -62,7 +85,8 @@ trait FlowOps[+Out] { * * @see [[#mapAsyncUnordered]] */ - def mapAsync[T](f: Out ⇒ Future[T]): Repr[T] + def mapAsync[T](f: Out ⇒ Future[T]): Repr[T] = + andThen(MapAsync(f.asInstanceOf[Any ⇒ Future[Any]])) /** * Transform this stream by applying the given function to each of the elements @@ -74,19 +98,26 @@ trait FlowOps[+Out] { * * @see [[#mapAsync]] */ - def mapAsyncUnordered[T](f: Out ⇒ Future[T]): Repr[T] + def mapAsyncUnordered[T](f: Out ⇒ Future[T]): Repr[T] = + andThen(MapAsyncUnordered(f.asInstanceOf[Any ⇒ Future[Any]])) /** * Only pass on those elements that satisfy the given predicate. */ - def filter(p: Out ⇒ Boolean): Repr[Out] + def filter(p: Out ⇒ Boolean): Repr[Out] = + transform("filter", () ⇒ new Transformer[Out, Out] { + def onNext(in: Out) = if (p(in)) List(in) else Nil + }) /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. * Non-matching elements are filtered out. */ - def collect[T](pf: PartialFunction[Out, T]): Repr[T] + def collect[T](pf: PartialFunction[Out, T]): Repr[T] = + transform("collect", () ⇒ new Transformer[Out, T] { + def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil + }) /** * Chunk up this stream into groups of the given size, with the last group @@ -94,7 +125,22 @@ trait FlowOps[+Out] { * * `n` must be positive, otherwise IllegalArgumentException is thrown. */ - def grouped(n: Int): Repr[immutable.Seq[Out]] + def grouped(n: Int): Repr[immutable.Seq[Out]] = { + require(n > 0, "n must be greater than 0") + transform("grouped", () ⇒ new Transformer[Out, immutable.Seq[Out]] { + var buf: Vector[Out] = Vector.empty + def onNext(in: Out) = { + buf :+= in + if (buf.size == n) { + val group = buf + buf = Vector.empty + List(group) + } else + Nil + } + override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) + }) + } /** * Chunk up this stream into groups of elements received within a time window, @@ -106,18 +152,72 @@ trait FlowOps[+Out] { * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] + def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = { + require(n > 0, "n must be greater than 0") + require(d > Duration.Zero) + timerTransform("groupedWithin", () ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { + schedulePeriodically(GroupedWithinTimerKey, d) + var buf: Vector[Out] = Vector.empty + + def onNext(in: Out) = { + buf :+= in + if (buf.size == n) { + // start new time window + schedulePeriodically(GroupedWithinTimerKey, d) + emitGroup() + } else Nil + } + override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) + def onTimer(timerKey: Any) = emitGroup() + private def emitGroup(): immutable.Seq[immutable.Seq[Out]] = + if (buf.isEmpty) EmptyImmutableSeq + else { + val group = buf + buf = Vector.empty + List(group) + } + }) + } /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. */ - def drop(n: Int): Repr[Out] + def drop(n: Int): Repr[Out] = + transform("drop", () ⇒ new Transformer[Out, Out] { + var delegate: Transformer[Out, Out] = + if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] + else new Transformer[Out, Out] { + var c = n + def onNext(in: Out) = { + c -= 1 + if (c == 0) + delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + } + + def onNext(in: Out) = delegate.onNext(in) + }) /** * Discard the elements received within the given duration at beginning of the stream. */ - def dropWithin(d: FiniteDuration): Repr[Out] + def dropWithin(d: FiniteDuration): Repr[Out] = + timerTransform("dropWithin", () ⇒ new TimerTransformer[Out, Out] { + scheduleOnce(DropWithinTimerKey, d) + + var delegate: Transformer[Out, Out] = + new Transformer[Out, Out] { + def onNext(in: Out) = Nil + } + + def onNext(in: Out) = delegate.onNext(in) + def onTimer(timerKey: Any) = { + delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + }) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -128,7 +228,23 @@ trait FlowOps[+Out] { * The stream will be completed without producing any elements if `n` is zero * or negative. */ - def take(n: Int): Repr[Out] + def take(n: Int): Repr[Out] = + transform("take", () ⇒ new Transformer[Out, Out] { + var delegate: Transformer[Out, Out] = + if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + else new Transformer[Out, Out] { + var c = n + def onNext(in: Out) = { + c -= 1 + if (c == 0) + delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + List(in) + } + } + + def onNext(in: Out) = delegate.onNext(in) + override def isComplete = delegate.isComplete + }) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -139,7 +255,19 @@ trait FlowOps[+Out] { * Note that this can be combined with [[#take]] to limit the number of elements * within the duration. */ - def takeWithin(d: FiniteDuration): Repr[Out] + def takeWithin(d: FiniteDuration): Repr[Out] = + timerTransform("takeWithin", () ⇒ new TimerTransformer[Out, Out] { + scheduleOnce(TakeWithinTimerKey, d) + + var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] + + def onNext(in: Out) = delegate.onNext(in) + override def isComplete = delegate.isComplete + def onTimer(timerKey: Any) = { + delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + }) /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -152,7 +280,8 @@ trait FlowOps[+Out] { * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ - def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[S] + def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[S] = + andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older @@ -167,7 +296,8 @@ trait FlowOps[+Out] { * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Repr[U] + def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Repr[U] = + andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)])) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. @@ -177,7 +307,10 @@ trait FlowOps[+Out] { * @param size The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] + def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = { + require(size > 0, s"Buffer size must be larger than zero but was [$size]") + andThen(Buffer(size, overflowStrategy)) + } /** * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] @@ -201,14 +334,17 @@ trait FlowOps[+Out] { * * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. */ - def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[T] + def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[T] = { + andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) + } /** * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair * of an empty collection and a stream containing the whole upstream unchanged. */ - def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] + def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] = + andThen(PrefixAndTail(n)) /** * This operation demultiplexes the incoming stream into separate output @@ -221,7 +357,8 @@ trait FlowOps[+Out] { * care to unblock (or cancel) all of the produced streams even if you want * to consume only one of them. */ - def groupBy[K, U >: Out](f: Out ⇒ K): Repr[(K, Source[U])] + def groupBy[K, U >: Out](f: Out ⇒ K): Repr[(K, Source[U])] = + andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) /** * This operation applies the given predicate to all incoming elements and @@ -236,13 +373,17 @@ trait FlowOps[+Out] { * true, false, false // elements go into third substream * }}} */ - def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U]] + def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U]] = + andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. */ - def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] + def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] = strategy match { + case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) + case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]") + } /** * Transformation of a stream, with additional support for scheduled events. @@ -268,19 +409,28 @@ trait FlowOps[+Out] { * * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. */ - def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Repr[U] + def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Repr[U] = + andThen(TimerTransform(name, mkTransformer.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) + + /** INTERNAL API */ + // Storing ops in reverse order + protected def andThen[U](op: AstNode): Repr[U] } /** - * Flow with attached input and output, can be executed. + * INTERNAL API */ -trait RunnableFlow { - def run()(implicit materializer: FlowMaterializer): MaterializedFlow -} +private[scaladsl2] object FlowOps { + private case object TakeWithinTimerKey + private case object DropWithinTimerKey + private case object GroupedWithinTimerKey -/** - * Returned by [[RunnableFlow#run]] and can be used as parameter to the - * accessor method to retrieve the materialized `Tap` or `Drain`, e.g. - * [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]]. - */ -trait MaterializedFlow extends MaterializedTap with MaterializedDrain + private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + override def onNext(elem: Any) = Nil + override def isComplete = true + } + + private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + override def onNext(elem: Any) = List(elem) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala index ec0dabc20c..7de9622334 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala @@ -3,214 +3,13 @@ */ package akka.stream.scaladsl2 -import scala.collection.immutable -import akka.stream.impl2.Ast._ +import akka.stream.impl2.Ast.AstNode import org.reactivestreams._ -import scala.concurrent.Future -import akka.stream.Transformer -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration.Duration -import akka.util.Collections.EmptyImmutableSeq -import akka.stream.TimerTransformer -import akka.stream.OverflowStrategy import scala.annotation.unchecked.uncheckedVariance import scala.language.higherKinds import scala.language.existentials -private[scaladsl2] object PipeOps { - private case object TakeWithinTimerKey - private case object DropWithinTimerKey - private case object GroupedWithinTimerKey - - private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { - override def onNext(elem: Any) = Nil - override def isComplete = true - } - - private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { - override def onNext(elem: Any) = List(elem) - } -} - -/** - * Scala API: Operations offered by flows with a free output side: the DSL flows left-to-right only. - */ -private[scaladsl2] trait PipeOps[+Out] extends FlowOps[Out] { - import PipeOps._ - type Repr[+O] - - // Storing ops in reverse order - protected def andThen[U](op: AstNode): Repr[U] - - override def map[T](f: Out ⇒ T): Repr[T] = - transform("map", () ⇒ new Transformer[Out, T] { - override def onNext(in: Out) = List(f(in)) - }) - - override def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = - transform("mapConcat", () ⇒ new Transformer[Out, T] { - override def onNext(in: Out) = f(in) - }) - - override def mapAsync[T](f: Out ⇒ Future[T]): Repr[T] = - andThen(MapAsync(f.asInstanceOf[Any ⇒ Future[Any]])) - - override def mapAsyncUnordered[T](f: Out ⇒ Future[T]): Repr[T] = - andThen(MapAsyncUnordered(f.asInstanceOf[Any ⇒ Future[Any]])) - - override def filter(p: Out ⇒ Boolean): Repr[Out] = - transform("filter", () ⇒ new Transformer[Out, Out] { - override def onNext(in: Out) = if (p(in)) List(in) else Nil - }) - - override def collect[T](pf: PartialFunction[Out, T]): Repr[T] = - transform("collect", () ⇒ new Transformer[Out, T] { - override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil - }) - - override def grouped(n: Int): Repr[immutable.Seq[Out]] = { - require(n > 0, "n must be greater than 0") - transform("grouped", () ⇒ new Transformer[Out, immutable.Seq[Out]] { - var buf: Vector[Out] = Vector.empty - override def onNext(in: Out) = { - buf :+= in - if (buf.size == n) { - val group = buf - buf = Vector.empty - List(group) - } else - Nil - } - override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) - }) - } - - override def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = { - require(n > 0, "n must be greater than 0") - require(d > Duration.Zero) - timerTransform("groupedWithin", () ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { - schedulePeriodically(GroupedWithinTimerKey, d) - var buf: Vector[Out] = Vector.empty - - override def onNext(in: Out) = { - buf :+= in - if (buf.size == n) { - // start new time window - schedulePeriodically(GroupedWithinTimerKey, d) - emitGroup() - } else Nil - } - override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) - override def onTimer(timerKey: Any) = emitGroup() - private def emitGroup(): immutable.Seq[immutable.Seq[Out]] = - if (buf.isEmpty) EmptyImmutableSeq - else { - val group = buf - buf = Vector.empty - List(group) - } - }) - } - - override def drop(n: Int): Repr[Out] = - transform("drop", () ⇒ new Transformer[Out, Out] { - var delegate: Transformer[Out, Out] = - if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] - else new Transformer[Out, Out] { - var c = n - override def onNext(in: Out) = { - c -= 1 - if (c == 0) - delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] - Nil - } - } - - override def onNext(in: Out) = delegate.onNext(in) - }) - - override def dropWithin(d: FiniteDuration): Repr[Out] = - timerTransform("dropWithin", () ⇒ new TimerTransformer[Out, Out] { - scheduleOnce(DropWithinTimerKey, d) - - var delegate: Transformer[Out, Out] = - new Transformer[Out, Out] { - override def onNext(in: Out) = Nil - } - - override def onNext(in: Out) = delegate.onNext(in) - override def onTimer(timerKey: Any) = { - delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] - Nil - } - }) - - override def take(n: Int): Repr[Out] = - transform("take", () ⇒ new Transformer[Out, Out] { - var delegate: Transformer[Out, Out] = - if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] - else new Transformer[Out, Out] { - var c = n - override def onNext(in: Out) = { - c -= 1 - if (c == 0) - delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] - List(in) - } - } - - override def onNext(in: Out) = delegate.onNext(in) - override def isComplete = delegate.isComplete - }) - - override def takeWithin(d: FiniteDuration): Repr[Out] = - timerTransform("takeWithin", () ⇒ new TimerTransformer[Out, Out] { - scheduleOnce(TakeWithinTimerKey, d) - - var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] - - override def onNext(in: Out) = delegate.onNext(in) - override def isComplete = delegate.isComplete - override def onTimer(timerKey: Any) = { - delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] - Nil - } - }) - - override def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[S] = - andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) - - override def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Repr[U] = - andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)])) - - override def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = { - require(size > 0, s"Buffer size must be larger than zero but was [$size]") - andThen(Buffer(size, overflowStrategy)) - } - - override def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[T] = { - andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) - } - - override def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] = - andThen(PrefixAndTail(n)) - - override def groupBy[K, U >: Out](f: Out ⇒ K): Repr[(K, Source[U])] = - andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) - - override def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U]] = - andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) - - override def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] = strategy match { - case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) - case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]") - } - - override def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Repr[U] = - andThen(TimerTransform(name, mkTransformer.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) -} - private[scaladsl2] object Pipe { private val emptyInstance = Pipe[Any, Any](ops = Nil) def empty[T]: Pipe[T, T] = emptyInstance.asInstanceOf[Pipe[T, T]] @@ -221,7 +20,7 @@ private[scaladsl2] object Pipe { /** * Flow with one open input and one open output.. */ -private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] with PipeOps[Out] { +private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] { override type Repr[+O] = Pipe[In @uncheckedVariance, O] override protected def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) @@ -263,7 +62,7 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As /** * Pipe with open output and attached input. Can be used as a `Publisher`. */ -private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[AstNode]) extends Source[Out] with PipeOps[Out] { +private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[AstNode]) extends Source[Out] { override type Repr[+O] = SourcePipe[O] override protected def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 16fa3d1d60..cbcd8a323d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -13,5 +13,6 @@ import scala.annotation.unchecked.uncheckedVariance * Can be used as a `Subscriber` */ trait Sink[-In] { - def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] + def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = + Flow[In].connect(this).toSubscriber() } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala index 9d8354099e..edcac2250d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala @@ -5,7 +5,8 @@ package akka.stream.scaladsl2 import akka.stream.impl._ import akka.stream.impl2.ActorBasedFlowMaterializer -import akka.stream.{ Transformer, OverflowStrategy, TimerTransformer } +import akka.stream.impl2.Ast.AstNode +import akka.stream.{ scaladsl2, Transformer, OverflowStrategy, TimerTransformer } import org.reactivestreams.{ Publisher, Subscriber } import scala.collection.immutable import scala.concurrent.duration.FiniteDuration @@ -24,12 +25,35 @@ import scala.annotation.unchecked.uncheckedVariance * FlowMaterializers can be used but must then implement the functionality of these * Tap nodes themselves (or construct an ActorBasedFlowMaterializer). */ -trait Tap[+Out] extends Source[Out] +trait Tap[+Out] extends Source[Out] { + override type Repr[+O] = SourcePipe[O] + + private def sourcePipe = Pipe.empty[Out].withTap(this) + + override def connect[T](flow: Flow[Out, T]): Source[T] = sourcePipe.connect(flow) + + override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink) + + override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = + sourcePipe.toPublisher()(materializer) + + override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = + sourcePipe.toFanoutPublisher(initialBufferSize, maximumBufferSize)(materializer) + + override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = + sourcePipe.publishTo(subscriber)(materializer) + + override def consume()(implicit materializer: FlowMaterializer): Unit = + sourcePipe.consume() + + /** INTERNAL API */ + override protected def andThen[U](op: AstNode) = SourcePipe(this, List(op)) +} /** * A tap that does not need to create a user-accessible object during materialization. */ -trait SimpleTap[+Out] extends Tap[Out] with TapOps[Out] { +trait SimpleTap[+Out] extends Tap[Out] { /** * Attach this tap to the given [[org.reactivestreams.Subscriber]]. Using the given * [[FlowMaterializer]] is completely optional, especially if this tap belongs to @@ -65,7 +89,7 @@ trait SimpleTap[+Out] extends Tap[Out] with TapOps[Out] { * to retrieve in order to access aspects of this tap (could be a Subscriber, a * Future/Promise, etc.). */ -trait TapWithKey[+Out, T] extends TapOps[Out] { +trait TapWithKey[+Out, T] extends Tap[Out] { /** * Attach this tap to the given [[org.reactivestreams.Subscriber]]. Using the given * [[FlowMaterializer]] is completely optional, especially if this tap belongs to @@ -212,68 +236,3 @@ trait MaterializedTap { */ def getTapFor[T](tapKey: TapWithKey[_, T]): T } - -trait TapOps[+Out] extends Tap[Out] { - override type Repr[+O] = SourcePipe[O] - - private def sourcePipe = Pipe.empty[Out].withTap(this) - - override def connect[T](flow: Flow[Out, T]): Source[T] = sourcePipe.connect(flow) - - override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink) - - override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = - sourcePipe.toPublisher()(materializer) - - override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = - sourcePipe.toFanoutPublisher(initialBufferSize, maximumBufferSize)(materializer) - - override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = - sourcePipe.publishTo(subscriber)(materializer) - - override def consume()(implicit materializer: FlowMaterializer): Unit = - sourcePipe.consume() - - override def map[T](f: Out ⇒ T): Repr[T] = sourcePipe.map(f) - - override def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = sourcePipe.mapConcat(f) - - override def mapAsync[T](f: Out ⇒ Future[T]): Repr[T] = sourcePipe.mapAsync(f) - - override def mapAsyncUnordered[T](f: Out ⇒ Future[T]): Repr[T] = sourcePipe.mapAsyncUnordered(f) - - override def filter(p: Out ⇒ Boolean): Repr[Out] = sourcePipe.filter(p) - - override def collect[T](pf: PartialFunction[Out, T]): Repr[T] = sourcePipe.collect(pf) - - override def grouped(n: Int): Repr[immutable.Seq[Out]] = sourcePipe.grouped(n) - - override def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = sourcePipe.groupedWithin(n, d) - - override def drop(n: Int): Repr[Out] = sourcePipe.drop(n) - - override def dropWithin(d: FiniteDuration): Repr[Out] = sourcePipe.dropWithin(d) - - override def take(n: Int): Repr[Out] = sourcePipe.take(n) - - override def takeWithin(d: FiniteDuration): Repr[Out] = sourcePipe.takeWithin(d) - - override def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[S] = sourcePipe.conflate(seed, aggregate) - - override def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Repr[U] = sourcePipe.expand(seed, extrapolate) - - override def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = sourcePipe.buffer(size, overflowStrategy) - - override def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[T] = sourcePipe.transform(name, mkTransformer) - - override def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] = sourcePipe.prefixAndTail(n) - - override def groupBy[K, U >: Out](f: Out ⇒ K): Repr[(K, Source[U])] = sourcePipe.groupBy(f) - - override def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U]] = sourcePipe.splitWhen(p) - - override def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] = sourcePipe.flatten(strategy) - - override def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Repr[U] = - sourcePipe.timerTransform(name, mkTransformer) -}