Merge pull request #16032 from akka/wip-16028-clean-up-duplication-of-tap-source-flow-ops-ban

=str #16028 Cleaned up ducplicate implementations of FlowOps
This commit is contained in:
Patrik Nordwall 2014-10-06 11:34:01 +02:00
commit 21df1984d2
5 changed files with 217 additions and 313 deletions

View file

@ -30,7 +30,7 @@ trait Drain[-In] extends Sink[In]
/**
* A drain that does not need to create a user-accessible object during materialization.
*/
trait SimpleDrain[-In] extends Drain[In] with DrainOps[In] {
trait SimpleDrain[-In] extends Drain[In] {
/**
* Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this drain belongs to
@ -63,7 +63,7 @@ trait SimpleDrain[-In] extends Drain[In] with DrainOps[In] {
* to retrieve in order to access aspects of this drain (could be a completion Future
* or a cancellation handle, etc.)
*/
trait DrainWithKey[-In, T] extends DrainOps[In] {
trait DrainWithKey[-In, T] extends Drain[In] {
/**
* Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this drain belongs to
@ -269,8 +269,3 @@ trait MaterializedDrain {
*/
def getDrainFor[T](drainKey: DrainWithKey[_, T]): T
}
trait DrainOps[-In] extends Drain[In] {
override def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] =
Flow[In].connect(this).toSubscriber()
}

View file

@ -3,9 +3,11 @@
*/
package akka.stream.scaladsl2
import akka.stream.impl2.Ast._
import akka.stream.{ TimerTransformer, Transformer, OverflowStrategy }
import akka.util.Collections.EmptyImmutableSeq
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.Future
import scala.language.higherKinds
@ -35,23 +37,44 @@ object Flow {
def apply[T]: Flow[T, T] = Pipe.empty[T]
}
/**
* Flow with attached input and output, can be executed.
*/
trait RunnableFlow {
def run()(implicit materializer: FlowMaterializer): MaterializedFlow
}
/**
* Returned by [[RunnableFlow#run]] and can be used as parameter to the
* accessor method to retrieve the materialized `Tap` or `Drain`, e.g.
* [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]].
*/
trait MaterializedFlow extends MaterializedTap with MaterializedDrain
/**
* Scala API: Operations offered by Flows and Sources with a free output side: the DSL flows left-to-right only.
*/
trait FlowOps[+Out] {
import FlowOps._
type Repr[+O]
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*/
def map[T](f: Out T): Repr[T]
def map[T](f: Out T): Repr[T] =
transform("map", () new Transformer[Out, T] {
def onNext(in: Out) = List(f(in))
})
/**
* Transform each input element into a sequence of output elements that is
* then flattened into the output stream.
*/
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T]
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] =
transform("mapConcat", () new Transformer[Out, T] {
def onNext(in: Out) = f(in)
})
/**
* Transform this stream by applying the given function to each of the elements
@ -62,7 +85,8 @@ trait FlowOps[+Out] {
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](f: Out Future[T]): Repr[T]
def mapAsync[T](f: Out Future[T]): Repr[T] =
andThen(MapAsync(f.asInstanceOf[Any Future[Any]]))
/**
* Transform this stream by applying the given function to each of the elements
@ -74,19 +98,26 @@ trait FlowOps[+Out] {
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](f: Out Future[T]): Repr[T]
def mapAsyncUnordered[T](f: Out Future[T]): Repr[T] =
andThen(MapAsyncUnordered(f.asInstanceOf[Any Future[Any]]))
/**
* Only pass on those elements that satisfy the given predicate.
*/
def filter(p: Out Boolean): Repr[Out]
def filter(p: Out Boolean): Repr[Out] =
transform("filter", () new Transformer[Out, Out] {
def onNext(in: Out) = if (p(in)) List(in) else Nil
})
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T]
def collect[T](pf: PartialFunction[Out, T]): Repr[T] =
transform("collect", () new Transformer[Out, T] {
def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
})
/**
* Chunk up this stream into groups of the given size, with the last group
@ -94,7 +125,22 @@ trait FlowOps[+Out] {
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*/
def grouped(n: Int): Repr[immutable.Seq[Out]]
def grouped(n: Int): Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
transform("grouped", () new Transformer[Out, immutable.Seq[Out]] {
var buf: Vector[Out] = Vector.empty
def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
val group = buf
buf = Vector.empty
List(group)
} else
Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
})
}
/**
* Chunk up this stream into groups of elements received within a time window,
@ -106,18 +152,72 @@ trait FlowOps[+Out] {
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
timerTransform("groupedWithin", () new TimerTransformer[Out, immutable.Seq[Out]] {
schedulePeriodically(GroupedWithinTimerKey, d)
var buf: Vector[Out] = Vector.empty
def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
// start new time window
schedulePeriodically(GroupedWithinTimerKey, d)
emitGroup()
} else Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
def onTimer(timerKey: Any) = emitGroup()
private def emitGroup(): immutable.Seq[immutable.Seq[Out]] =
if (buf.isEmpty) EmptyImmutableSeq
else {
val group = buf
buf = Vector.empty
List(group)
}
})
}
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*/
def drop(n: Int): Repr[Out]
def drop(n: Int): Repr[Out] =
transform("drop", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] =
if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] {
var c = n
def onNext(in: Out) = {
c -= 1
if (c == 0)
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
}
def onNext(in: Out) = delegate.onNext(in)
})
/**
* Discard the elements received within the given duration at beginning of the stream.
*/
def dropWithin(d: FiniteDuration): Repr[Out]
def dropWithin(d: FiniteDuration): Repr[Out] =
timerTransform("dropWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(DropWithinTimerKey, d)
var delegate: Transformer[Out, Out] =
new Transformer[Out, Out] {
def onNext(in: Out) = Nil
}
def onNext(in: Out) = delegate.onNext(in)
def onTimer(timerKey: Any) = {
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
})
/**
* Terminate processing (and cancel the upstream publisher) after the given
@ -128,7 +228,23 @@ trait FlowOps[+Out] {
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*/
def take(n: Int): Repr[Out]
def take(n: Int): Repr[Out] =
transform("take", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] =
if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] {
var c = n
def onNext(in: Out) = {
c -= 1
if (c == 0)
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
List(in)
}
}
def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
})
/**
* Terminate processing (and cancel the upstream publisher) after the given
@ -139,7 +255,19 @@ trait FlowOps[+Out] {
* Note that this can be combined with [[#take]] to limit the number of elements
* within the duration.
*/
def takeWithin(d: FiniteDuration): Repr[Out]
def takeWithin(d: FiniteDuration): Repr[Out] =
timerTransform("takeWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(TakeWithinTimerKey, d)
var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]]
def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
def onTimer(timerKey: Any) = {
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
})
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -152,7 +280,8 @@ trait FlowOps[+Out] {
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Out S, aggregate: (S, Out) S): Repr[S]
def conflate[S](seed: Out S, aggregate: (S, Out) S): Repr[S] =
andThen(Conflate(seed.asInstanceOf[Any Any], aggregate.asInstanceOf[(Any, Any) Any]))
/**
* Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older
@ -167,7 +296,8 @@ trait FlowOps[+Out] {
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
def expand[S, U](seed: Out S, extrapolate: S (U, S)): Repr[U]
def expand[S, U](seed: Out S, extrapolate: S (U, S)): Repr[U] =
andThen(Expand(seed.asInstanceOf[Any Any], extrapolate.asInstanceOf[Any (Any, Any)]))
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
@ -177,7 +307,10 @@ trait FlowOps[+Out] {
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
andThen(Buffer(size, overflowStrategy))
}
/**
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
@ -201,14 +334,17 @@ trait FlowOps[+Out] {
*
* Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer.
*/
def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[T]
def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[T] = {
andThen(Transform(name, mkTransformer.asInstanceOf[() Transformer[Any, Any]]))
}
/**
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*/
def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])]
def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] =
andThen(PrefixAndTail(n))
/**
* This operation demultiplexes the incoming stream into separate output
@ -221,7 +357,8 @@ trait FlowOps[+Out] {
* care to unblock (or cancel) all of the produced streams even if you want
* to consume only one of them.
*/
def groupBy[K, U >: Out](f: Out K): Repr[(K, Source[U])]
def groupBy[K, U >: Out](f: Out K): Repr[(K, Source[U])] =
andThen(GroupBy(f.asInstanceOf[Any Any]))
/**
* This operation applies the given predicate to all incoming elements and
@ -236,13 +373,17 @@ trait FlowOps[+Out] {
* true, false, false // elements go into third substream
* }}}
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U]]
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U]] =
andThen(SplitWhen(p.asInstanceOf[Any Boolean]))
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Source]].
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U]
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] = strategy match {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]")
}
/**
* Transformation of a stream, with additional support for scheduled events.
@ -268,19 +409,28 @@ trait FlowOps[+Out] {
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Repr[U]
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Repr[U] =
andThen(TimerTransform(name, mkTransformer.asInstanceOf[() TimerTransformer[Any, Any]]))
/** INTERNAL API */
// Storing ops in reverse order
protected def andThen[U](op: AstNode): Repr[U]
}
/**
* Flow with attached input and output, can be executed.
* INTERNAL API
*/
trait RunnableFlow {
def run()(implicit materializer: FlowMaterializer): MaterializedFlow
}
private[scaladsl2] object FlowOps {
private case object TakeWithinTimerKey
private case object DropWithinTimerKey
private case object GroupedWithinTimerKey
/**
* Returned by [[RunnableFlow#run]] and can be used as parameter to the
* accessor method to retrieve the materialized `Tap` or `Drain`, e.g.
* [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]].
*/
trait MaterializedFlow extends MaterializedTap with MaterializedDrain
private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] {
override def onNext(elem: Any) = Nil
override def isComplete = true
}
private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] {
override def onNext(elem: Any) = List(elem)
}
}

View file

@ -3,214 +3,13 @@
*/
package akka.stream.scaladsl2
import scala.collection.immutable
import akka.stream.impl2.Ast._
import akka.stream.impl2.Ast.AstNode
import org.reactivestreams._
import scala.concurrent.Future
import akka.stream.Transformer
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration
import akka.util.Collections.EmptyImmutableSeq
import akka.stream.TimerTransformer
import akka.stream.OverflowStrategy
import scala.annotation.unchecked.uncheckedVariance
import scala.language.higherKinds
import scala.language.existentials
private[scaladsl2] object PipeOps {
private case object TakeWithinTimerKey
private case object DropWithinTimerKey
private case object GroupedWithinTimerKey
private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] {
override def onNext(elem: Any) = Nil
override def isComplete = true
}
private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] {
override def onNext(elem: Any) = List(elem)
}
}
/**
* Scala API: Operations offered by flows with a free output side: the DSL flows left-to-right only.
*/
private[scaladsl2] trait PipeOps[+Out] extends FlowOps[Out] {
import PipeOps._
type Repr[+O]
// Storing ops in reverse order
protected def andThen[U](op: AstNode): Repr[U]
override def map[T](f: Out T): Repr[T] =
transform("map", () new Transformer[Out, T] {
override def onNext(in: Out) = List(f(in))
})
override def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] =
transform("mapConcat", () new Transformer[Out, T] {
override def onNext(in: Out) = f(in)
})
override def mapAsync[T](f: Out Future[T]): Repr[T] =
andThen(MapAsync(f.asInstanceOf[Any Future[Any]]))
override def mapAsyncUnordered[T](f: Out Future[T]): Repr[T] =
andThen(MapAsyncUnordered(f.asInstanceOf[Any Future[Any]]))
override def filter(p: Out Boolean): Repr[Out] =
transform("filter", () new Transformer[Out, Out] {
override def onNext(in: Out) = if (p(in)) List(in) else Nil
})
override def collect[T](pf: PartialFunction[Out, T]): Repr[T] =
transform("collect", () new Transformer[Out, T] {
override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
})
override def grouped(n: Int): Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
transform("grouped", () new Transformer[Out, immutable.Seq[Out]] {
var buf: Vector[Out] = Vector.empty
override def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
val group = buf
buf = Vector.empty
List(group)
} else
Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
})
}
override def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
timerTransform("groupedWithin", () new TimerTransformer[Out, immutable.Seq[Out]] {
schedulePeriodically(GroupedWithinTimerKey, d)
var buf: Vector[Out] = Vector.empty
override def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
// start new time window
schedulePeriodically(GroupedWithinTimerKey, d)
emitGroup()
} else Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
override def onTimer(timerKey: Any) = emitGroup()
private def emitGroup(): immutable.Seq[immutable.Seq[Out]] =
if (buf.isEmpty) EmptyImmutableSeq
else {
val group = buf
buf = Vector.empty
List(group)
}
})
}
override def drop(n: Int): Repr[Out] =
transform("drop", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] =
if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] {
var c = n
override def onNext(in: Out) = {
c -= 1
if (c == 0)
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
}
override def onNext(in: Out) = delegate.onNext(in)
})
override def dropWithin(d: FiniteDuration): Repr[Out] =
timerTransform("dropWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(DropWithinTimerKey, d)
var delegate: Transformer[Out, Out] =
new Transformer[Out, Out] {
override def onNext(in: Out) = Nil
}
override def onNext(in: Out) = delegate.onNext(in)
override def onTimer(timerKey: Any) = {
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
})
override def take(n: Int): Repr[Out] =
transform("take", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] =
if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] {
var c = n
override def onNext(in: Out) = {
c -= 1
if (c == 0)
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
List(in)
}
}
override def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
})
override def takeWithin(d: FiniteDuration): Repr[Out] =
timerTransform("takeWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(TakeWithinTimerKey, d)
var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]]
override def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
override def onTimer(timerKey: Any) = {
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
})
override def conflate[S](seed: Out S, aggregate: (S, Out) S): Repr[S] =
andThen(Conflate(seed.asInstanceOf[Any Any], aggregate.asInstanceOf[(Any, Any) Any]))
override def expand[S, U](seed: Out S, extrapolate: S (U, S)): Repr[U] =
andThen(Expand(seed.asInstanceOf[Any Any], extrapolate.asInstanceOf[Any (Any, Any)]))
override def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
andThen(Buffer(size, overflowStrategy))
}
override def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[T] = {
andThen(Transform(name, mkTransformer.asInstanceOf[() Transformer[Any, Any]]))
}
override def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] =
andThen(PrefixAndTail(n))
override def groupBy[K, U >: Out](f: Out K): Repr[(K, Source[U])] =
andThen(GroupBy(f.asInstanceOf[Any Any]))
override def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U]] =
andThen(SplitWhen(p.asInstanceOf[Any Boolean]))
override def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] = strategy match {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]")
}
override def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Repr[U] =
andThen(TimerTransform(name, mkTransformer.asInstanceOf[() TimerTransformer[Any, Any]]))
}
private[scaladsl2] object Pipe {
private val emptyInstance = Pipe[Any, Any](ops = Nil)
def empty[T]: Pipe[T, T] = emptyInstance.asInstanceOf[Pipe[T, T]]
@ -221,7 +20,7 @@ private[scaladsl2] object Pipe {
/**
* Flow with one open input and one open output..
*/
private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] with PipeOps[Out] {
private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] {
override type Repr[+O] = Pipe[In @uncheckedVariance, O]
override protected def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops)
@ -263,7 +62,7 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As
/**
* Pipe with open output and attached input. Can be used as a `Publisher`.
*/
private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[AstNode]) extends Source[Out] with PipeOps[Out] {
private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[AstNode]) extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
override protected def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops)

View file

@ -13,5 +13,6 @@ import scala.annotation.unchecked.uncheckedVariance
* Can be used as a `Subscriber`
*/
trait Sink[-In] {
def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance]
def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] =
Flow[In].connect(this).toSubscriber()
}

View file

@ -5,7 +5,8 @@ package akka.stream.scaladsl2
import akka.stream.impl._
import akka.stream.impl2.ActorBasedFlowMaterializer
import akka.stream.{ Transformer, OverflowStrategy, TimerTransformer }
import akka.stream.impl2.Ast.AstNode
import akka.stream.{ scaladsl2, Transformer, OverflowStrategy, TimerTransformer }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
@ -24,12 +25,35 @@ import scala.annotation.unchecked.uncheckedVariance
* FlowMaterializers can be used but must then implement the functionality of these
* Tap nodes themselves (or construct an ActorBasedFlowMaterializer).
*/
trait Tap[+Out] extends Source[Out]
trait Tap[+Out] extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
private def sourcePipe = Pipe.empty[Out].withTap(this)
override def connect[T](flow: Flow[Out, T]): Source[T] = sourcePipe.connect(flow)
override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink)
override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] =
sourcePipe.toPublisher()(materializer)
override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] =
sourcePipe.toFanoutPublisher(initialBufferSize, maximumBufferSize)(materializer)
override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit =
sourcePipe.publishTo(subscriber)(materializer)
override def consume()(implicit materializer: FlowMaterializer): Unit =
sourcePipe.consume()
/** INTERNAL API */
override protected def andThen[U](op: AstNode) = SourcePipe(this, List(op))
}
/**
* A tap that does not need to create a user-accessible object during materialization.
*/
trait SimpleTap[+Out] extends Tap[Out] with TapOps[Out] {
trait SimpleTap[+Out] extends Tap[Out] {
/**
* Attach this tap to the given [[org.reactivestreams.Subscriber]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this tap belongs to
@ -65,7 +89,7 @@ trait SimpleTap[+Out] extends Tap[Out] with TapOps[Out] {
* to retrieve in order to access aspects of this tap (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait TapWithKey[+Out, T] extends TapOps[Out] {
trait TapWithKey[+Out, T] extends Tap[Out] {
/**
* Attach this tap to the given [[org.reactivestreams.Subscriber]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this tap belongs to
@ -212,68 +236,3 @@ trait MaterializedTap {
*/
def getTapFor[T](tapKey: TapWithKey[_, T]): T
}
trait TapOps[+Out] extends Tap[Out] {
override type Repr[+O] = SourcePipe[O]
private def sourcePipe = Pipe.empty[Out].withTap(this)
override def connect[T](flow: Flow[Out, T]): Source[T] = sourcePipe.connect(flow)
override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink)
override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] =
sourcePipe.toPublisher()(materializer)
override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] =
sourcePipe.toFanoutPublisher(initialBufferSize, maximumBufferSize)(materializer)
override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit =
sourcePipe.publishTo(subscriber)(materializer)
override def consume()(implicit materializer: FlowMaterializer): Unit =
sourcePipe.consume()
override def map[T](f: Out T): Repr[T] = sourcePipe.map(f)
override def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] = sourcePipe.mapConcat(f)
override def mapAsync[T](f: Out Future[T]): Repr[T] = sourcePipe.mapAsync(f)
override def mapAsyncUnordered[T](f: Out Future[T]): Repr[T] = sourcePipe.mapAsyncUnordered(f)
override def filter(p: Out Boolean): Repr[Out] = sourcePipe.filter(p)
override def collect[T](pf: PartialFunction[Out, T]): Repr[T] = sourcePipe.collect(pf)
override def grouped(n: Int): Repr[immutable.Seq[Out]] = sourcePipe.grouped(n)
override def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = sourcePipe.groupedWithin(n, d)
override def drop(n: Int): Repr[Out] = sourcePipe.drop(n)
override def dropWithin(d: FiniteDuration): Repr[Out] = sourcePipe.dropWithin(d)
override def take(n: Int): Repr[Out] = sourcePipe.take(n)
override def takeWithin(d: FiniteDuration): Repr[Out] = sourcePipe.takeWithin(d)
override def conflate[S](seed: Out S, aggregate: (S, Out) S): Repr[S] = sourcePipe.conflate(seed, aggregate)
override def expand[S, U](seed: Out S, extrapolate: S (U, S)): Repr[U] = sourcePipe.expand(seed, extrapolate)
override def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = sourcePipe.buffer(size, overflowStrategy)
override def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[T] = sourcePipe.transform(name, mkTransformer)
override def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U])] = sourcePipe.prefixAndTail(n)
override def groupBy[K, U >: Out](f: Out K): Repr[(K, Source[U])] = sourcePipe.groupBy(f)
override def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U]] = sourcePipe.splitWhen(p)
override def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U] = sourcePipe.flatten(strategy)
override def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Repr[U] =
sourcePipe.timerTransform(name, mkTransformer)
}