2014-09-03 21:54:18 +02:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
|
2014-09-03 21:54:18 +02:00
|
|
|
*/
|
2014-10-27 14:35:41 +01:00
|
|
|
package akka.stream.scaladsl
|
2014-09-03 21:54:18 +02:00
|
|
|
|
2016-05-23 11:31:49 +03:00
|
|
|
import akka.stream.impl.Stages.DefaultAttributes
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.{ Done, NotUsed }
|
2015-04-16 02:24:01 +02:00
|
|
|
import akka.actor.{ ActorRef, Cancellable, Props }
|
2015-10-09 15:11:01 -04:00
|
|
|
import akka.stream.actor.ActorPublisher
|
2015-07-06 22:00:21 +02:00
|
|
|
import akka.stream.impl.StreamLayout.Module
|
2015-12-14 17:02:00 +01:00
|
|
|
import akka.stream.impl.fusing.GraphStages
|
|
|
|
|
import akka.stream.impl.fusing.GraphStages._
|
2015-06-29 23:47:31 -04:00
|
|
|
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
|
|
|
|
|
import akka.stream.{ Outlet, SourceShape, _ }
|
2015-07-06 22:00:21 +02:00
|
|
|
import org.reactivestreams.{ Publisher, Subscriber }
|
2015-06-29 23:47:31 -04:00
|
|
|
import scala.annotation.tailrec
|
2015-12-14 17:02:00 +01:00
|
|
|
import scala.annotation.unchecked.uncheckedVariance
|
2014-09-03 21:54:18 +02:00
|
|
|
import scala.collection.immutable
|
2016-08-09 21:08:31 -05:00
|
|
|
import scala.concurrent.duration.FiniteDuration
|
2015-04-16 02:24:01 +02:00
|
|
|
import scala.concurrent.{ Future, Promise }
|
2016-01-21 16:37:26 +01:00
|
|
|
import java.util.concurrent.CompletionStage
|
|
|
|
|
import scala.compat.java8.FutureConverters._
|
2014-09-03 21:54:18 +02:00
|
|
|
|
2014-10-02 17:32:08 +02:00
|
|
|
/**
|
2015-01-28 14:19:50 +01:00
|
|
|
* A `Source` is a set of stream processing steps that has one open output. It can comprise
|
|
|
|
|
* any number of internal sources and transformations that are wired together, or it can be
|
|
|
|
|
* an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into
|
|
|
|
|
* a Reactive Streams `Publisher` (at least conceptually).
|
2014-10-02 17:32:08 +02:00
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class Source[+Out, +Mat](override val module: Module)
|
2016-03-31 12:43:38 +02:00
|
|
|
extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-11-25 19:58:48 +01:00
|
|
|
override type Repr[+O] = Source[O, Mat @uncheckedVariance]
|
|
|
|
|
override type ReprMat[+O, +M] = Source[O, M]
|
|
|
|
|
|
|
|
|
|
override type Closed = RunnableGraph[Mat @uncheckedVariance]
|
|
|
|
|
override type ClosedMat[+M] = RunnableGraph[M]
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]]
|
|
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override def toString: String = s"Source($shape, $module)"
|
|
|
|
|
|
2015-11-25 19:58:48 +01:00
|
|
|
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
|
|
|
|
|
|
|
|
|
|
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {
|
2015-12-14 17:02:00 +01:00
|
|
|
if (flow.module eq GraphStages.Identity.module) this.asInstanceOf[Source[T, Mat3]]
|
2015-01-28 14:19:50 +01:00
|
|
|
else {
|
|
|
|
|
val flowCopy = flow.module.carbonCopy
|
|
|
|
|
new Source(
|
|
|
|
|
module
|
2015-12-14 14:52:06 +01:00
|
|
|
.fuse(flowCopy, shape.out, flowCopy.shape.inlets.head, combine)
|
2015-09-17 10:08:12 +02:00
|
|
|
.replaceShape(SourceShape(flowCopy.shape.outlets.head)))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
}
|
2014-10-02 17:32:08 +02:00
|
|
|
|
|
|
|
|
/**
|
2014-11-06 14:03:01 +01:00
|
|
|
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
|
|
|
|
|
* concatenating the processing steps of both.
|
2014-10-02 17:32:08 +02:00
|
|
|
*/
|
2015-06-23 18:41:55 +02:00
|
|
|
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
|
|
|
|
|
* concatenating the processing steps of both.
|
|
|
|
|
*/
|
2015-06-23 18:41:55 +02:00
|
|
|
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = {
|
2015-01-28 14:19:50 +01:00
|
|
|
val sinkCopy = sink.module.carbonCopy
|
2015-12-14 14:52:06 +01:00
|
|
|
RunnableGraph(module.fuse(sinkCopy, shape.out, sinkCopy.shape.inlets.head, combine))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Transform only the materialized value of this Source, leaving all other properties as they were.
|
|
|
|
|
*/
|
2016-03-18 12:28:27 +01:00
|
|
|
override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] =
|
2015-11-25 19:58:48 +01:00
|
|
|
new Source[Out, Mat2](module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any]))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2014-10-02 13:34:27 +02:00
|
|
|
/**
|
2014-10-17 14:05:50 +02:00
|
|
|
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
|
2014-11-06 14:03:01 +01:00
|
|
|
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
|
2014-10-02 13:34:27 +02:00
|
|
|
*/
|
2015-06-23 18:28:53 +02:00
|
|
|
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run()
|
2014-09-03 21:54:18 +02:00
|
|
|
|
2014-10-06 14:46:52 +02:00
|
|
|
/**
|
|
|
|
|
* Shortcut for running this `Source` with a fold function.
|
|
|
|
|
* The given function is invoked for every received element, giving it its previous
|
|
|
|
|
* output (or the given `zero` value) and the element as input.
|
|
|
|
|
* The returned [[scala.concurrent.Future]] will be completed with value of the final
|
|
|
|
|
* function evaluation when the input stream ends, or completed with `Failure`
|
2015-01-30 10:30:56 +01:00
|
|
|
* if there is a failure signaled in the stream.
|
2014-10-06 14:46:52 +02:00
|
|
|
*/
|
2016-08-24 21:02:32 +02:00
|
|
|
def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.fold(zero)(f))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Shortcut for running this `Source` with a foldAsync function.
|
|
|
|
|
* The given function is invoked for every received element, giving it its previous
|
|
|
|
|
* output (or the given `zero` value) and the element as input.
|
|
|
|
|
* The returned [[scala.concurrent.Future]] will be completed with value of the final
|
|
|
|
|
* function evaluation when the input stream ends, or completed with `Failure`
|
|
|
|
|
* if there is a failure signaled in the stream.
|
|
|
|
|
*/
|
|
|
|
|
def runFoldAsync[U](zero: U)(f: (U, Out) ⇒ Future[U])(implicit materializer: Materializer): Future[U] = runWith(Sink.foldAsync(zero)(f))
|
2014-10-06 14:46:52 +02:00
|
|
|
|
2016-01-15 22:51:26 -05:00
|
|
|
/**
|
|
|
|
|
* Shortcut for running this `Source` with a reduce function.
|
|
|
|
|
* The given function is invoked for every received element, giving it its previous
|
|
|
|
|
* output (from the second element) and the element as input.
|
|
|
|
|
* The returned [[scala.concurrent.Future]] will be completed with value of the final
|
|
|
|
|
* function evaluation when the input stream ends, or completed with `Failure`
|
|
|
|
|
* if there is a failure signaled in the stream.
|
2016-04-11 15:36:10 +02:00
|
|
|
*
|
|
|
|
|
* If the stream is empty (i.e. completes before signalling any elements),
|
|
|
|
|
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
|
|
|
|
|
* which is semantically in-line with that Scala's standard library collections
|
|
|
|
|
* do in such situations.
|
2016-01-15 22:51:26 -05:00
|
|
|
*/
|
|
|
|
|
def runReduce[U >: Out](f: (U, U) ⇒ U)(implicit materializer: Materializer): Future[U] =
|
|
|
|
|
runWith(Sink.reduce(f))
|
|
|
|
|
|
2014-10-06 14:46:52 +02:00
|
|
|
/**
|
|
|
|
|
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
|
|
|
|
|
* for each received element.
|
|
|
|
|
* The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the
|
2015-01-30 10:30:56 +01:00
|
|
|
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
|
2014-10-06 14:46:52 +02:00
|
|
|
* the stream.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
// FIXME: Out => Unit should stay, right??
|
|
|
|
|
def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Done] = runWith(Sink.foreach(f))
|
2014-10-06 14:46:52 +02:00
|
|
|
|
2015-07-06 22:00:21 +02:00
|
|
|
/**
|
2015-12-22 20:56:02 +01:00
|
|
|
* Change the attributes of this [[Source]] to the given ones and seal the list
|
|
|
|
|
* of attributes. This means that further calls will not be able to remove these
|
|
|
|
|
* attributes, but instead add new ones. Note that this
|
|
|
|
|
* operation has no effect on an empty Flow (because the attributes apply
|
|
|
|
|
* only to the contained processing stages).
|
2015-07-06 22:00:21 +02:00
|
|
|
*/
|
2015-11-25 19:58:48 +01:00
|
|
|
override def withAttributes(attr: Attributes): Repr[Out] =
|
2016-02-12 08:28:16 +01:00
|
|
|
new Source(module.withAttributes(attr))
|
2014-12-01 20:07:55 +02:00
|
|
|
|
2015-12-22 20:56:02 +01:00
|
|
|
/**
|
|
|
|
|
* Add the given attributes to this Source. Further calls to `withAttributes`
|
|
|
|
|
* will not remove these attributes. Note that this
|
|
|
|
|
* operation has no effect on an empty Flow (because the attributes apply
|
|
|
|
|
* only to the contained processing stages).
|
|
|
|
|
*/
|
|
|
|
|
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr)
|
|
|
|
|
|
|
|
|
|
/**
|
2016-08-11 07:37:54 -05:00
|
|
|
* Add a ``name`` attribute to this Source.
|
2015-12-22 20:56:02 +01:00
|
|
|
*/
|
2016-02-10 13:56:38 +01:00
|
|
|
override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Put an asynchronous boundary around this `Source`
|
|
|
|
|
*/
|
|
|
|
|
override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary)
|
2015-04-14 08:59:37 +02:00
|
|
|
|
2016-08-11 07:37:54 -05:00
|
|
|
/**
|
|
|
|
|
* Converts this Scala DSL element to it's Java DSL counterpart.
|
|
|
|
|
*/
|
2015-03-06 12:22:14 +01:00
|
|
|
def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)
|
2015-06-29 23:47:31 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Combines several sources with fun-in strategy like `Merge` or `Concat` and returns `Source`.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
|
2015-11-30 15:45:37 +01:00
|
|
|
Source.fromGraph(GraphDSL.create() { implicit b ⇒
|
|
|
|
|
import GraphDSL.Implicits._
|
2015-06-29 23:47:31 -04:00
|
|
|
val c = b.add(strategy(rest.size + 2))
|
|
|
|
|
first ~> c.in(0)
|
|
|
|
|
second ~> c.in(1)
|
|
|
|
|
|
|
|
|
|
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
|
|
|
|
|
if (i.hasNext) {
|
|
|
|
|
i.next() ~> c.in(idx)
|
|
|
|
|
combineRest(idx + 1, i)
|
|
|
|
|
} else SourceShape(c.out)
|
|
|
|
|
|
|
|
|
|
combineRest(2, rest.iterator)
|
|
|
|
|
})
|
2014-10-02 17:32:08 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-21 22:45:39 +02:00
|
|
|
object Source {
|
2015-04-16 02:24:01 +02:00
|
|
|
/** INTERNAL API */
|
2016-05-03 18:58:26 -07:00
|
|
|
def shape[T](name: String): SourceShape[T] = SourceShape(Outlet(name + ".out"))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Helper to create [[Source]] from `Publisher`.
|
|
|
|
|
*
|
|
|
|
|
* Construct a transformation starting with given publisher. The transformation steps
|
|
|
|
|
* are executed by a series of [[org.reactivestreams.Processor]] instances
|
|
|
|
|
* that mediate the flow of elements downstream and the propagation of
|
|
|
|
|
* back-pressure upstream.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
|
2015-04-20 21:04:03 +02:00
|
|
|
new Source(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2014-09-03 21:54:18 +02:00
|
|
|
/**
|
2014-10-02 17:32:08 +02:00
|
|
|
* Helper to create [[Source]] from `Iterator`.
|
2015-12-17 11:48:30 +02:00
|
|
|
* Example usage: `Source.fromIterator(() => Iterator.from(0))`
|
2014-09-03 21:54:18 +02:00
|
|
|
*
|
2014-11-09 21:09:50 +01:00
|
|
|
* Start a new `Source` from the given function that produces anIterator.
|
|
|
|
|
* The produced stream of elements will continue until the iterator runs empty
|
|
|
|
|
* or fails during evaluation of the `next()` method.
|
|
|
|
|
* Elements are pulled out of the iterator in accordance with the demand coming
|
|
|
|
|
* from the downstream transformation steps.
|
2014-09-03 21:54:18 +02:00
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def fromIterator[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] =
|
2015-07-06 22:00:21 +02:00
|
|
|
apply(new immutable.Iterable[T] {
|
|
|
|
|
override def iterator: Iterator[T] = f()
|
|
|
|
|
override def toString: String = "() => Iterator"
|
|
|
|
|
})
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-03-22 21:59:52 -05:00
|
|
|
/**
|
|
|
|
|
* Create [[Source]] that will continually produce given elements in specified order.
|
|
|
|
|
*
|
|
|
|
|
* Start a new 'cycled' `Source` from the given elements. The producer stream of elements
|
|
|
|
|
* will continue infinitely by repeating the sequence of elements provided by function parameter.
|
|
|
|
|
*/
|
|
|
|
|
def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = {
|
|
|
|
|
val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten
|
|
|
|
|
fromIterator(() ⇒ iterator).withAttributes(DefaultAttributes.cycledSource)
|
|
|
|
|
}
|
|
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
/**
|
|
|
|
|
* A graph with the shape of a source logically is a source, this method makes
|
|
|
|
|
* it so also in type.
|
|
|
|
|
*/
|
2015-10-21 22:45:39 +02:00
|
|
|
def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
|
|
|
|
|
case s: Source[T, M] ⇒ s
|
|
|
|
|
case s: javadsl.Source[T, M] ⇒ s.asScala
|
|
|
|
|
case other ⇒ new Source(other.module)
|
2015-06-06 17:17:23 +02:00
|
|
|
}
|
2014-09-03 21:54:18 +02:00
|
|
|
|
|
|
|
|
/**
|
2014-10-02 17:32:08 +02:00
|
|
|
* Helper to create [[Source]] from `Iterable`.
|
|
|
|
|
* Example usage: `Source(Seq(1,2,3))`
|
2014-09-03 21:54:18 +02:00
|
|
|
*
|
2014-10-02 17:32:08 +02:00
|
|
|
* Starts a new `Source` from the given `Iterable`. This is like starting from an
|
2014-09-03 21:54:18 +02:00
|
|
|
* Iterator, but every Subscriber directly attached to the Publisher of this
|
|
|
|
|
* stream will see an individual flow of elements (always starting from the
|
|
|
|
|
* beginning) regardless of when they subscribed.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
|
2015-12-14 17:02:00 +01:00
|
|
|
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Start a new `Source` from the given `Future`. The stream will consist of
|
|
|
|
|
* one element when the `Future` is completed with a successful value, which
|
|
|
|
|
* may happen before or after materializing the `Flow`.
|
2015-03-05 12:21:17 +01:00
|
|
|
* The stream terminates with a failure if the `Future` is completed with a failure.
|
2015-01-28 14:19:50 +01:00
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def fromFuture[T](future: Future[T]): Source[T, NotUsed] =
|
2016-01-06 13:01:37 +01:00
|
|
|
fromGraph(new FutureSource(future))
|
2014-09-03 21:54:18 +02:00
|
|
|
|
2016-01-21 16:37:26 +01:00
|
|
|
/**
|
|
|
|
|
* Start a new `Source` from the given `Future`. The stream will consist of
|
|
|
|
|
* one element when the `Future` is completed with a successful value, which
|
|
|
|
|
* may happen before or after materializing the `Flow`.
|
|
|
|
|
* The stream terminates with a failure if the `Future` is completed with a failure.
|
|
|
|
|
*/
|
|
|
|
|
def fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed] =
|
|
|
|
|
fromGraph(new FutureSource(future.toScala))
|
|
|
|
|
|
2014-09-03 21:54:18 +02:00
|
|
|
/**
|
2015-01-26 14:16:57 +01:00
|
|
|
* Elements are emitted periodically with the specified interval.
|
2014-09-03 21:54:18 +02:00
|
|
|
* The tick element will be delivered to downstream consumers that has requested any elements.
|
|
|
|
|
* If a consumer has not requested any elements at the point in time when the tick
|
|
|
|
|
* element is produced it will not receive that tick element later. It will
|
|
|
|
|
* receive new tick elements as soon as it has requested more elements.
|
|
|
|
|
*/
|
2015-11-04 11:43:11 +02:00
|
|
|
def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
|
2016-01-18 17:49:32 +01:00
|
|
|
fromGraph(new TickSource[T](initialDelay, interval, tick))
|
2014-10-02 13:34:27 +02:00
|
|
|
|
2014-10-27 09:48:54 +02:00
|
|
|
/**
|
|
|
|
|
* Create a `Source` with one element.
|
|
|
|
|
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def single[T](element: T): Source[T, NotUsed] =
|
2016-01-18 17:49:32 +01:00
|
|
|
fromGraph(new GraphStages.SingleSource(element))
|
2014-10-27 09:48:54 +02:00
|
|
|
|
2015-02-26 12:36:46 +01:00
|
|
|
/**
|
|
|
|
|
* Create a `Source` that will continually emit the given element.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def repeat[T](element: T): Source[T, NotUsed] = {
|
2016-01-12 10:52:09 +01:00
|
|
|
val next = Some((element, element))
|
2016-01-14 11:25:56 +01:00
|
|
|
unfold(element)(_ ⇒ next).withAttributes(DefaultAttributes.repeat)
|
2016-01-12 10:52:09 +01:00
|
|
|
}
|
2015-02-26 12:36:46 +01:00
|
|
|
|
2015-12-10 12:49:59 +02:00
|
|
|
/**
|
2015-12-14 13:28:21 +02:00
|
|
|
* Create a `Source` that will unfold a value of type `S` into
|
2015-12-10 12:49:59 +02:00
|
|
|
* a pair of the next state `S` and output elements of type `E`.
|
|
|
|
|
*
|
2015-12-14 13:28:21 +02:00
|
|
|
* For example, all the Fibonacci numbers under 10M:
|
2015-12-10 12:49:59 +02:00
|
|
|
*
|
|
|
|
|
* {{{
|
2015-12-14 13:28:21 +02:00
|
|
|
* Source.unfold(0 → 1) {
|
|
|
|
|
* case (a, _) if a > 10000000 ⇒ None
|
|
|
|
|
* case (a, b) ⇒ Some((b → (a + b)) → a)
|
2015-12-10 12:49:59 +02:00
|
|
|
* }
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def unfold[S, E](s: S)(f: S ⇒ Option[(S, E)]): Source[E, NotUsed] =
|
2016-01-11 17:15:44 +01:00
|
|
|
Source.fromGraph(new Unfold(s, f))
|
2015-12-10 12:49:59 +02:00
|
|
|
|
|
|
|
|
/**
|
2015-12-14 13:28:21 +02:00
|
|
|
* Same as [[unfold]], but uses an async function to generate the next state-element tuple.
|
2015-12-10 12:49:59 +02:00
|
|
|
*
|
|
|
|
|
* async fibonacci example:
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
2015-12-14 13:28:21 +02:00
|
|
|
* Source.unfoldAsync(0 → 1) {
|
|
|
|
|
* case (a, _) if a > 10000000 ⇒ Future.successful(None)
|
|
|
|
|
* case (a, b) ⇒ Future{
|
2015-12-10 12:49:59 +02:00
|
|
|
* Thread.sleep(1000)
|
|
|
|
|
* Some((b → (a + b)) → a)
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed] =
|
2016-01-11 17:15:44 +01:00
|
|
|
Source.fromGraph(new UnfoldAsync(s, f))
|
2015-12-10 12:49:59 +02:00
|
|
|
|
2014-10-27 09:48:54 +02:00
|
|
|
/**
|
2014-10-30 17:04:36 +01:00
|
|
|
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
|
2014-10-27 09:48:54 +02:00
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def empty[T]: Source[T, NotUsed] = _empty
|
|
|
|
|
private[this] val _empty: Source[Nothing, NotUsed] =
|
2015-07-06 22:00:21 +02:00
|
|
|
new Source(
|
|
|
|
|
new PublisherSource[Nothing](
|
|
|
|
|
EmptyPublisher,
|
|
|
|
|
DefaultAttributes.emptySource,
|
|
|
|
|
shape("EmptySource")))
|
2014-10-27 09:48:54 +02:00
|
|
|
|
2015-01-29 10:21:54 +01:00
|
|
|
/**
|
2015-10-21 22:45:39 +02:00
|
|
|
* Create a `Source` which materializes a [[scala.concurrent.Promise]] which controls what element
|
|
|
|
|
* will be emitted by the Source.
|
|
|
|
|
* If the materialized promise is completed with a Some, that value will be produced downstream,
|
|
|
|
|
* followed by completion.
|
|
|
|
|
* If the materialized promise is completed with a None, no value will be produced downstream and completion will
|
|
|
|
|
* be signalled immediately.
|
|
|
|
|
* If the materialized promise is completed with a failure, then the returned source will terminate with that error.
|
|
|
|
|
* If the downstream of this source cancels before the promise has been completed, then the promise will be completed
|
|
|
|
|
* with None.
|
2015-01-29 10:21:54 +01:00
|
|
|
*/
|
2015-10-21 22:45:39 +02:00
|
|
|
def maybe[T]: Source[T, Promise[Option[T]]] =
|
|
|
|
|
new Source(new MaybeSource[T](DefaultAttributes.maybeSource, shape("MaybeSource")))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def failed[T](cause: Throwable): Source[T, NotUsed] =
|
2015-07-06 22:00:21 +02:00
|
|
|
new Source(
|
|
|
|
|
new PublisherSource(
|
|
|
|
|
ErrorPublisher(cause, "FailedSource")[T],
|
|
|
|
|
DefaultAttributes.failedSource,
|
|
|
|
|
shape("FailedSource")))
|
2014-10-27 09:48:54 +02:00
|
|
|
|
2016-11-25 16:25:26 +01:00
|
|
|
/**
|
|
|
|
|
* Creates a `Source` that is not materialized until there is downstream demand, when the source gets materialized
|
|
|
|
|
* the materialized future is completed with its value, if downstream cancels or fails without any demand the
|
|
|
|
|
* create factory is never called and the materialized `Future` is failed.
|
|
|
|
|
*/
|
|
|
|
|
def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] =
|
|
|
|
|
Source.fromGraph(new LazySource[T, M](create))
|
|
|
|
|
|
2014-10-17 14:05:50 +02:00
|
|
|
/**
|
|
|
|
|
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
|
|
|
|
|
*/
|
2015-12-17 11:48:30 +02:00
|
|
|
def asSubscriber[T]: Source[T, Subscriber[T]] =
|
2015-04-20 21:04:03 +02:00
|
|
|
new Source(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource")))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-03-31 15:13:57 +02:00
|
|
|
/**
|
|
|
|
|
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
2015-10-09 15:11:01 -04:00
|
|
|
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
|
2015-03-31 15:13:57 +02:00
|
|
|
* be [[akka.stream.actor.ActorPublisher]].
|
|
|
|
|
*/
|
2015-10-09 15:11:01 -04:00
|
|
|
def actorPublisher[T](props: Props): Source[T, ActorRef] = {
|
|
|
|
|
require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher")
|
2015-04-20 21:04:03 +02:00
|
|
|
new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
|
2015-10-09 15:11:01 -04:00
|
|
|
}
|
2015-03-31 15:13:57 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
|
|
|
|
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
|
|
|
|
* otherwise they will be buffered until request for demand is received.
|
|
|
|
|
*
|
|
|
|
|
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
|
|
|
|
|
* there is no space available in the buffer.
|
|
|
|
|
*
|
2015-07-09 10:18:18 +02:00
|
|
|
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
|
|
|
|
|
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
|
|
|
|
|
*
|
2016-05-01 19:58:49 -06:00
|
|
|
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
|
|
|
|
|
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
|
|
|
|
|
* this Source; as such, it is never safe to assume the downstream will always generate demand.
|
2015-03-31 15:13:57 +02:00
|
|
|
*
|
2016-04-15 16:29:53 +02:00
|
|
|
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]
|
|
|
|
|
* (whose content will be ignored) in which case already buffered elements will be signaled before signaling
|
|
|
|
|
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
|
2015-03-31 15:13:57 +02:00
|
|
|
*
|
2016-04-15 16:29:53 +02:00
|
|
|
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
2015-04-28 09:48:46 +02:00
|
|
|
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
2016-04-15 16:29:53 +02:00
|
|
|
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
|
2015-09-28 22:23:59 -07:00
|
|
|
* the failure will be signaled downstream immediately (instead of the completion signal).
|
2015-03-31 15:13:57 +02:00
|
|
|
*
|
2015-09-28 22:23:59 -07:00
|
|
|
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
2015-03-31 15:13:57 +02:00
|
|
|
* i.e. you can watch it to get notified when that happens.
|
|
|
|
|
*
|
2016-10-26 10:24:51 +02:00
|
|
|
* See also [[akka.stream.scaladsl.Source.queue]].
|
2016-05-01 19:58:49 -06:00
|
|
|
*
|
2015-03-31 15:13:57 +02:00
|
|
|
* @param bufferSize The size of the buffer in element count
|
|
|
|
|
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
|
|
|
|
|
*/
|
|
|
|
|
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
|
|
|
|
|
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
|
2016-01-16 12:17:19 -05:00
|
|
|
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
|
2015-04-20 21:04:03 +02:00
|
|
|
new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
|
2015-03-31 15:13:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-06-29 23:47:31 -04:00
|
|
|
/**
|
|
|
|
|
* Combines several sources with fun-in strategy like `Merge` or `Concat` and returns `Source`.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
|
2015-11-30 15:45:37 +01:00
|
|
|
Source.fromGraph(GraphDSL.create() { implicit b ⇒
|
|
|
|
|
import GraphDSL.Implicits._
|
2015-06-29 23:47:31 -04:00
|
|
|
val c = b.add(strategy(rest.size + 2))
|
|
|
|
|
first ~> c.in(0)
|
|
|
|
|
second ~> c.in(1)
|
|
|
|
|
|
|
|
|
|
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
|
|
|
|
|
if (i.hasNext) {
|
|
|
|
|
i.next() ~> c.in(idx)
|
|
|
|
|
combineRest(idx + 1, i)
|
|
|
|
|
} else SourceShape(c.out)
|
|
|
|
|
|
|
|
|
|
combineRest(2, rest.iterator)
|
|
|
|
|
})
|
|
|
|
|
|
2016-04-22 12:04:28 +02:00
|
|
|
/**
|
|
|
|
|
* Combine the elements of multiple streams into a stream of sequences.
|
|
|
|
|
*/
|
|
|
|
|
def zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed] = zipWithN(ConstantFun.scalaIdentityFunction[immutable.Seq[T]])(sources).addAttributes(DefaultAttributes.zipN)
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Combine the elements of multiple streams into a stream of sequences using a combiner function.
|
|
|
|
|
*/
|
|
|
|
|
def zipWithN[T, O](zipper: immutable.Seq[T] ⇒ O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed] = {
|
|
|
|
|
val source = sources match {
|
|
|
|
|
case immutable.Seq() ⇒ empty[O]
|
|
|
|
|
case immutable.Seq(source) ⇒ source.map(t ⇒ zipper(immutable.Seq(t))).mapMaterializedValue(_ ⇒ NotUsed)
|
|
|
|
|
case s1 +: s2 +: ss ⇒ combine(s1, s2, ss: _*)(ZipWithN(zipper))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
source.addAttributes(DefaultAttributes.zipWithN)
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-19 23:04:20 -04:00
|
|
|
/**
|
2016-08-11 07:37:54 -05:00
|
|
|
* Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]].
|
2015-08-19 23:04:20 -04:00
|
|
|
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
|
2016-01-16 12:17:19 -05:00
|
|
|
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
|
|
|
|
|
* if downstream is terminated.
|
2015-08-19 23:04:20 -04:00
|
|
|
*
|
|
|
|
|
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
|
|
|
|
|
* there is no space available in the buffer.
|
|
|
|
|
*
|
|
|
|
|
* Acknowledgement mechanism is available.
|
2016-10-26 10:24:51 +02:00
|
|
|
* [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with
|
|
|
|
|
* `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with
|
|
|
|
|
* `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
|
|
|
|
|
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
|
2015-08-19 23:04:20 -04:00
|
|
|
*
|
2016-01-16 12:17:19 -05:00
|
|
|
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
|
|
|
|
|
* call when buffer is full.
|
2015-08-19 23:04:20 -04:00
|
|
|
*
|
2016-08-11 07:37:54 -05:00
|
|
|
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]].
|
2016-01-16 12:17:19 -05:00
|
|
|
* It returns future that completes with success when stream is completed or fail when stream is failed.
|
2015-08-19 23:04:20 -04:00
|
|
|
*
|
2016-11-22 16:11:09 +03:00
|
|
|
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
|
|
|
|
|
* for downstream demand unless there is another message waiting for downstream demand, in that case
|
|
|
|
|
* offer result will be completed according to the overflow strategy.
|
2016-01-16 12:17:19 -05:00
|
|
|
*
|
|
|
|
|
* SourceQueue that current source is materialized to is for single thread usage only.
|
|
|
|
|
*
|
|
|
|
|
* @param bufferSize size of buffer in element count
|
2015-08-19 23:04:20 -04:00
|
|
|
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
|
|
|
|
|
*/
|
2016-02-25 16:05:35 +01:00
|
|
|
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
|
2016-01-16 12:17:19 -05:00
|
|
|
Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource))
|
2015-08-19 23:04:20 -04:00
|
|
|
|
2016-02-22 23:22:47 -05:00
|
|
|
/**
|
|
|
|
|
* Start a new `Source` from some resource which can be opened, read and closed.
|
|
|
|
|
* Interaction with resource happens in a blocking way.
|
|
|
|
|
*
|
|
|
|
|
* Example:
|
|
|
|
|
* {{{
|
|
|
|
|
* Source.unfoldResource(
|
|
|
|
|
* () => new BufferedReader(new FileReader("...")),
|
|
|
|
|
* reader => Option(reader.readLine()),
|
|
|
|
|
* reader => reader.close())
|
|
|
|
|
* }}}
|
|
|
|
|
*
|
|
|
|
|
* You can use the supervision strategy to handle exceptions for `read` function. All exceptions thrown by `create`
|
|
|
|
|
* or `close` will fail the stream.
|
|
|
|
|
*
|
|
|
|
|
* `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means
|
|
|
|
|
* that stream will be terminated on error in `read` function by default.
|
|
|
|
|
*
|
|
|
|
|
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
|
|
|
|
|
* set it for a given Source by using [[ActorAttributes]].
|
|
|
|
|
*
|
|
|
|
|
* @param create - function that is called on stream start and creates/opens resource.
|
|
|
|
|
* @param read - function that reads data from opened resource. It is called each time backpressure signal
|
|
|
|
|
* is received. Stream calls close and completes when `read` returns None.
|
|
|
|
|
* @param close - function that closes resource
|
|
|
|
|
*/
|
|
|
|
|
def unfoldResource[T, S](create: () ⇒ S, read: (S) ⇒ Option[T], close: (S) ⇒ Unit): Source[T, NotUsed] =
|
|
|
|
|
Source.fromGraph(new UnfoldResourceSource(create, read, close))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Start a new `Source` from some resource which can be opened, read and closed.
|
|
|
|
|
* It's similar to `unfoldResource` but takes functions that return `Futures` instead of plain values.
|
|
|
|
|
*
|
|
|
|
|
* You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`.
|
|
|
|
|
* All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream.
|
|
|
|
|
*
|
|
|
|
|
* `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means
|
|
|
|
|
* that stream will be terminated on error in `read` function (or future) by default.
|
|
|
|
|
*
|
|
|
|
|
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
|
|
|
|
|
* set it for a given Source by using [[ActorAttributes]].
|
|
|
|
|
*
|
|
|
|
|
* @param create - function that is called on stream start and creates/opens resource.
|
|
|
|
|
* @param read - function that reads data from opened resource. It is called each time backpressure signal
|
|
|
|
|
* is received. Stream calls close and completes when `Future` from read function returns None.
|
|
|
|
|
* @param close - function that closes resource
|
|
|
|
|
*/
|
|
|
|
|
def unfoldResourceAsync[T, S](create: () ⇒ Future[S], read: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]): Source[T, NotUsed] =
|
|
|
|
|
Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|