!str #16102 Fold Tap/Drain into Source/Sink

* Fold Tap/Drain into Source/Sink
* Create Source/Sink helpers to create all Sources/Sinks
* Make concrete Source/Sink implementations private[scaladsl2]
This commit is contained in:
Björn Antonsson 2014-10-17 14:05:50 +02:00
parent 41ca0522ef
commit 0f61909ea7
100 changed files with 1066 additions and 1178 deletions

View file

@ -3,16 +3,13 @@
*/
package akka.stream.scaladsl2
import akka.stream.impl.EmptyPublisher
import akka.stream.impl.ErrorPublisher
import akka.stream.impl.SynchronousPublisherFromIterable
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousPublisherFromIterable }
import org.reactivestreams.Publisher
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.language.higherKinds
import scala.language.implicitConversions
/**
* A `Source` is a set of stream processing steps that has one open output and an attached input.
@ -32,11 +29,11 @@ trait Source[+Out] extends FlowOps[Out] {
def connect(sink: Sink[Out]): RunnableFlow
/**
* Connect this `Source` to a `Drain` and run it. The returned value is the materialized value
* of the `Drain`, e.g. the `Publisher` of a [[PublisherDrain]].
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[Sink.fanoutPublisher]].
*/
def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType =
connect(drain).run().materializedDrain(drain)
def runWith(sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
connect(sink).run().get(sink)
/**
* Shortcut for running this `Source` with a fold function.
@ -47,7 +44,7 @@ trait Source[+Out] extends FlowOps[Out] {
* if there is an error is signaled in the stream.
*/
def fold[U](zero: U)(f: (U, Out) U)(implicit materializer: FlowMaterializer): Future[U] =
runWith(FoldDrain(zero)(f))
runWith(FoldSink(zero)(f))
/**
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
@ -57,7 +54,7 @@ trait Source[+Out] extends FlowOps[Out] {
* the stream.
*/
def foreach(f: Out Unit)(implicit materializer: FlowMaterializer): Future[Unit] =
runWith(ForeachDrain(f))
runWith(ForeachSink(f))
/**
* Concatenates a second source so that the first element
@ -86,7 +83,7 @@ object Source {
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def apply[T](publisher: Publisher[T]): Source[T] = PublisherTap(publisher)
def apply[T](publisher: Publisher[T]): Source[T] = PublisherSource(publisher)
/**
* Helper to create [[Source]] from `Iterator`.
@ -98,7 +95,7 @@ object Source {
* in accordance with the demand coming from the downstream transformation
* steps.
*/
def apply[T](iterator: Iterator[T]): Source[T] = IteratorTap(iterator)
def apply[T](iterator: Iterator[T]): Source[T] = IteratorSource(iterator)
/**
* Helper to create [[Source]] from `Iterable`.
@ -109,14 +106,14 @@ object Source {
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T] = IterableTap(iterable)
def apply[T](iterable: immutable.Iterable[T]): Source[T] = IterableSource(iterable)
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def apply[T](f: () Option[T]): Source[T] = ThunkTap(f)
def apply[T](f: () Option[T]): Source[T] = ThunkSource(f)
/**
* Start a new `Source` from the given `Future`. The stream will consist of
@ -124,7 +121,7 @@ object Source {
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): Source[T] = FutureTap(future)
def apply[T](future: Future[T]): Source[T] = FutureSource(future)
/**
* Elements are produced from the tick closure periodically with the specified interval.
@ -134,7 +131,7 @@ object Source {
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] =
TickTap(initialDelay, interval, tick)
TickSource(initialDelay, interval, tick)
/**
* Create a `Source` with one element.
@ -171,10 +168,25 @@ object Source {
val out = block(builder)
builder.partialBuild().toSource(out)
}
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T](source1: Source[T], source2: Source[T]): Source[T] = ConcatTap(source1, source2)
def concat[T](source1: Source[T], source2: Source[T]): Source[T] = ConcatSource(source1, source2)
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def subscriber[T]: SubscriberSource[T] = SubscriberSource[T]
}
/**
* A `Source` that will create an object during materialization that the user will need
* to retrieve in order to access aspects of this source (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait KeyedSource[+Out] extends Source[Out] {
type MaterializedType
}