2014-09-03 21:54:18 +02:00
/* *
2017-01-04 17:37:10 +01:00
* Copyright ( C ) 2014 - 2017 Lightbend Inc . < http : //www.lightbend.com>
2014-09-03 21:54:18 +02:00
*/
2014-10-27 14:35:41 +01:00
package akka.stream.scaladsl
2014-09-03 21:54:18 +02:00
2016-05-23 11:31:49 +03:00
import akka.stream.impl.Stages.DefaultAttributes
2017-05-11 00:00:42 +08:00
import akka.util.ConstantFun
2016-01-20 10:00:37 +02:00
import akka. { Done , NotUsed }
2015-04-16 02:24:01 +02:00
import akka.actor. { ActorRef , Cancellable , Props }
2015-10-09 15:11:01 -04:00
import akka.stream.actor.ActorPublisher
2015-12-14 17:02:00 +01:00
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages._
2016-07-27 13:29:23 +02:00
import akka.stream.impl. { EmptyPublisher , ErrorPublisher , PublisherSource , _ }
2015-06-29 23:47:31 -04:00
import akka.stream. { Outlet , SourceShape , _ }
2015-07-06 22:00:21 +02:00
import org.reactivestreams. { Publisher , Subscriber }
2016-07-27 13:29:23 +02:00
2015-06-29 23:47:31 -04:00
import scala.annotation.tailrec
2015-12-14 17:02:00 +01:00
import scala.annotation.unchecked.uncheckedVariance
2014-09-03 21:54:18 +02:00
import scala.collection.immutable
2016-08-09 21:08:31 -05:00
import scala.concurrent.duration.FiniteDuration
2015-04-16 02:24:01 +02:00
import scala.concurrent. { Future , Promise }
2016-01-21 16:37:26 +01:00
import java.util.concurrent.CompletionStage
2016-07-27 13:29:23 +02:00
2016-01-21 16:37:26 +01:00
import scala.compat.java8.FutureConverters._
2014-09-03 21:54:18 +02:00
2014-10-02 17:32:08 +02:00
/* *
2015-01-28 14:19:50 +01:00
* A `Source` is a set of stream processing steps that has one open output . It can comprise
* any number of internal sources and transformations that are wired together , or it can be
* an “ atomic ” source , e . g . from a collection or a file . Materialization turns a Source into
* a Reactive Streams `Publisher` ( at least conceptually ) .
2014-10-02 17:32:08 +02:00
*/
2016-07-27 13:29:23 +02:00
final class Source [ + Out , + Mat ] (
override val traversalBuilder : LinearTraversalBuilder ,
2017-03-07 19:40:50 +01:00
override val shape : SourceShape [ Out ] )
2016-03-31 12:43:38 +02:00
extends FlowOpsMat [ Out , Mat ] with Graph [ SourceShape [ Out ] , Mat ] {
2015-01-28 14:19:50 +01:00
2015-11-25 19:58:48 +01:00
override type Repr [ + O ] = Source [ O , Mat @ uncheckedVariance ]
override type ReprMat [ + O , + M ] = Source [ O , M ]
override type Closed = RunnableGraph [ Mat @ uncheckedVariance ]
override type ClosedMat [ + M ] = RunnableGraph [ M ]
2015-01-28 14:19:50 +01:00
2016-07-27 13:29:23 +02:00
override def toString : String = s" Source( $shape ) "
2016-03-11 17:08:30 +01:00
2015-11-25 19:58:48 +01:00
override def via [ T , Mat2 ] ( flow : Graph [ FlowShape [ Out , T ] , Mat2 ] ) : Repr [ T ] = viaMat ( flow ) ( Keep . left )
override def viaMat [ T , Mat2 , Mat3 ] ( flow : Graph [ FlowShape [ Out , T ] , Mat2 ] ) ( combine : ( Mat , Mat2 ) ⇒ Mat3 ) : Source [ T , Mat3 ] = {
2017-03-02 13:44:35 +01:00
val toAppend =
if ( flow . traversalBuilder eq Flow . identityTraversalBuilder )
LinearTraversalBuilder . empty ( )
else
flow . traversalBuilder
new Source [ T , Mat3 ] (
traversalBuilder . append ( toAppend , flow . shape , combine ) ,
2017-03-07 19:40:50 +01:00
SourceShape ( flow . shape . out ) )
2015-01-28 14:19:50 +01:00
}
2014-10-02 17:32:08 +02:00
/* *
2014-11-06 14:03:01 +01:00
* Connect this [ [ akka . stream . scaladsl . Source ] ] to a [ [ akka . stream . scaladsl . Sink ] ] ,
* concatenating the processing steps of both .
2014-10-02 17:32:08 +02:00
*/
2015-06-23 18:41:55 +02:00
def to [ Mat2 ] ( sink : Graph [ SinkShape [ Out ] , Mat2 ] ) : RunnableGraph [ Mat ] = toMat ( sink ) ( Keep . left )
2015-01-28 14:19:50 +01:00
/* *
* Connect this [ [ akka . stream . scaladsl . Source ] ] to a [ [ akka . stream . scaladsl . Sink ] ] ,
* concatenating the processing steps of both .
*/
2015-06-23 18:41:55 +02:00
def toMat [ Mat2 , Mat3 ] ( sink : Graph [ SinkShape [ Out ] , Mat2 ] ) ( combine : ( Mat , Mat2 ) ⇒ Mat3 ) : RunnableGraph [ Mat3 ] = {
2016-07-27 13:29:23 +02:00
RunnableGraph ( traversalBuilder . append ( sink . traversalBuilder , sink . shape , combine ) )
2015-01-28 14:19:50 +01:00
}
/* *
* Transform only the materialized value of this Source , leaving all other properties as they were .
*/
2016-03-18 12:28:27 +01:00
override def mapMaterializedValue [ Mat2 ] ( f : Mat ⇒ Mat2 ) : ReprMat [ Out , Mat2 ] =
2016-07-27 13:29:23 +02:00
new Source [ Out , Mat2 ] ( traversalBuilder . transformMat ( f . asInstanceOf [ Any ⇒ Any ] ) , shape )
2015-01-28 14:19:50 +01:00
2014-10-02 13:34:27 +02:00
/* *
2014-10-17 14:05:50 +02:00
* Connect this `Source` to a `Sink` and run it . The returned value is the materialized value
2014-11-06 14:03:01 +01:00
* of the `Sink` , e . g . the `Publisher` of a [ [ akka . stream . scaladsl . Sink # publisher ] ] .
2014-10-02 13:34:27 +02:00
*/
2015-06-23 18:28:53 +02:00
def runWith [ Mat2 ] ( sink : Graph [ SinkShape [ Out ] , Mat2 ] ) ( implicit materializer : Materializer ) : Mat2 = toMat ( sink ) ( Keep . right ) . run ( )
2014-09-03 21:54:18 +02:00
2014-10-06 14:46:52 +02:00
/* *
* Shortcut for running this `Source` with a fold function .
* The given function is invoked for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
2015-01-30 10:30:56 +01:00
* if there is a failure signaled in the stream .
2014-10-06 14:46:52 +02:00
*/
2016-08-24 21:02:32 +02:00
def runFold [ U ] ( zero : U ) ( f : ( U , Out ) ⇒ U ) ( implicit materializer : Materializer ) : Future [ U ] = runWith ( Sink . fold ( zero ) ( f ) )
/* *
* Shortcut for running this `Source` with a foldAsync function .
* The given function is invoked for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
*/
def runFoldAsync [ U ] ( zero : U ) ( f : ( U , Out ) ⇒ Future [ U ] ) ( implicit materializer : Materializer ) : Future [ U ] = runWith ( Sink . foldAsync ( zero ) ( f ) )
2014-10-06 14:46:52 +02:00
2016-01-15 22:51:26 -05:00
/* *
* Shortcut for running this `Source` with a reduce function .
* The given function is invoked for every received element , giving it its previous
* output ( from the second element ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
2016-04-11 15:36:10 +02:00
*
* If the stream is empty ( i . e . completes before signalling any elements ) ,
* the reduce stage will fail its downstream with a [ [ NoSuchElementException ] ] ,
* which is semantically in - line with that Scala 's standard library collections
* do in such situations .
2016-01-15 22:51:26 -05:00
*/
def runReduce [ U >: Out ] ( f : ( U , U ) ⇒ U ) ( implicit materializer : Materializer ) : Future [ U ] =
runWith ( Sink . reduce ( f ) )
2014-10-06 14:46:52 +02:00
/* *
* Shortcut for running this `Source` with a foreach procedure . The given procedure is invoked
* for each received element .
* The returned [ [ scala . concurrent . Future ] ] will be completed with `Success` when reaching the
2015-01-30 10:30:56 +01:00
* normal end of the stream , or completed with `Failure` if there is a failure signaled in
2014-10-06 14:46:52 +02:00
* the stream .
*/
2016-01-20 10:00:37 +02:00
// FIXME: Out => Unit should stay, right??
def runForeach ( f : Out ⇒ Unit ) ( implicit materializer : Materializer ) : Future [ Done ] = runWith ( Sink . foreach ( f ) )
2014-10-06 14:46:52 +02:00
2015-07-06 22:00:21 +02:00
/* *
2015-12-22 20:56:02 +01:00
* Change the attributes of this [ [ Source ] ] to the given ones and seal the list
* of attributes . This means that further calls will not be able to remove these
* attributes , but instead add new ones . Note that this
* operation has no effect on an empty Flow ( because the attributes apply
* only to the contained processing stages ) .
2015-07-06 22:00:21 +02:00
*/
2015-11-25 19:58:48 +01:00
override def withAttributes ( attr : Attributes ) : Repr [ Out ] =
2016-07-27 13:29:23 +02:00
new Source ( traversalBuilder . setAttributes ( attr ) , shape )
2014-12-01 20:07:55 +02:00
2015-12-22 20:56:02 +01:00
/* *
* Add the given attributes to this Source . Further calls to `withAttributes`
* will not remove these attributes . Note that this
* operation has no effect on an empty Flow ( because the attributes apply
* only to the contained processing stages ) .
*/
2016-07-27 13:29:23 +02:00
override def addAttributes ( attr : Attributes ) : Repr [ Out ] = withAttributes ( traversalBuilder . attributes and attr )
2015-12-22 20:56:02 +01:00
/* *
2016-08-11 07:37:54 -05:00
* Add a ` `name` ` attribute to this Source .
2015-12-22 20:56:02 +01:00
*/
2016-02-10 13:56:38 +01:00
override def named ( name : String ) : Repr [ Out ] = addAttributes ( Attributes . name ( name ) )
/* *
* Put an asynchronous boundary around this `Source`
*/
2017-03-08 14:22:19 +01:00
override def async : Repr [ Out ] = addAttributes ( Attributes . asyncBoundary )
2015-04-14 08:59:37 +02:00
2016-08-11 07:37:54 -05:00
/* *
* Converts this Scala DSL element to it 's Java DSL counterpart .
*/
2015-03-06 12:22:14 +01:00
def asJava : javadsl.Source [ Out , Mat ] = new javadsl . Source ( this )
2015-06-29 23:47:31 -04:00
/* *
* Combines several sources with fun - in strategy like `Merge` or `Concat` and returns `Source` .
*/
2016-01-20 10:00:37 +02:00
def combine [ T , U ] ( first : Source [ T , _ ] , second : Source [ T , _ ] , rest : Source [ T , _ ] * ) ( strategy : Int ⇒ Graph [ UniformFanInShape [ T , U ] , NotUsed ] ) : Source [ U , NotUsed ] =
2015-11-30 15:45:37 +01:00
Source . fromGraph ( GraphDSL . create ( ) { implicit b ⇒
import GraphDSL.Implicits._
2015-06-29 23:47:31 -04:00
val c = b . add ( strategy ( rest . size + 2 ) )
first ~> c . in ( 0 )
second ~> c . in ( 1 )
@tailrec def combineRest ( idx : Int , i : Iterator [ Source [ T , _ ] ] ) : SourceShape [ U ] =
if ( i . hasNext ) {
i . next ( ) ~> c . in ( idx )
combineRest ( idx + 1 , i )
} else SourceShape ( c . out )
combineRest ( 2 , rest . iterator )
} )
2014-10-02 17:32:08 +02:00
}
2015-10-21 22:45:39 +02:00
object Source {
2015-04-16 02:24:01 +02:00
/* * INTERNAL API */
2016-05-03 18:58:26 -07:00
def shape [ T ] ( name : String ) : SourceShape [ T ] = SourceShape ( Outlet ( name + ".out" ) )
2015-01-28 14:19:50 +01:00
/* *
* Helper to create [ [ Source ] ] from `Publisher` .
*
* Construct a transformation starting with given publisher . The transformation steps
* are executed by a series of [ [ org . reactivestreams . Processor ] ] instances
* that mediate the flow of elements downstream and the propagation of
* back - pressure upstream .
*/
2016-01-20 10:00:37 +02:00
def fromPublisher [ T ] ( publisher : Publisher [ T ] ) : Source [ T , NotUsed ] =
2016-07-27 13:29:23 +02:00
fromGraph ( new PublisherSource ( publisher , DefaultAttributes . publisherSource , shape ( "PublisherSource" ) ) )
2015-01-28 14:19:50 +01:00
2014-09-03 21:54:18 +02:00
/* *
2014-10-02 17:32:08 +02:00
* Helper to create [ [ Source ] ] from `Iterator` .
2015-12-17 11:48:30 +02:00
* Example usage : `Source.fromIterator(() => Iterator.from(0))`
2014-09-03 21:54:18 +02:00
*
2014-11-09 21:09:50 +01:00
* Start a new `Source` from the given function that produces anIterator .
* The produced stream of elements will continue until the iterator runs empty
* or fails during evaluation of the `next()` method .
* Elements are pulled out of the iterator in accordance with the demand coming
* from the downstream transformation steps .
2014-09-03 21:54:18 +02:00
*/
2016-01-20 10:00:37 +02:00
def fromIterator [ T ] ( f : ( ) ⇒ Iterator [ T ] ) : Source [ T , NotUsed ] =
2015-07-06 22:00:21 +02:00
apply ( new immutable . Iterable [ T ] {
override def iterator : Iterator [ T ] = f ( )
override def toString : String = "() => Iterator"
} )
2015-01-28 14:19:50 +01:00
2016-03-22 21:59:52 -05:00
/* *
2016-09-04 16:11:49 +02:00
* Creates [ [ Source ] ] that will continually produce given elements in specified order .
2016-03-22 21:59:52 -05:00
*
2016-09-04 16:11:49 +02:00
* Starts a new 'cycled' `Source` from the given elements . The producer stream of elements
2016-03-22 21:59:52 -05:00
* will continue infinitely by repeating the sequence of elements provided by function parameter .
*/
def cycle [ T ] ( f : ( ) ⇒ Iterator [ T ] ) : Source [ T , NotUsed ] = {
val iterator = Iterator . continually { val i = f ( ) ; if ( i . isEmpty ) throw new IllegalArgumentException ( "empty iterator" ) else i } . flatten
fromIterator ( ( ) ⇒ iterator ) . withAttributes ( DefaultAttributes . cycledSource )
}
2015-01-28 14:19:50 +01:00
/* *
* A graph with the shape of a source logically is a source , this method makes
* it so also in type .
*/
2015-10-21 22:45:39 +02:00
def fromGraph [ T , M ] ( g : Graph [ SourceShape [ T ] , M ] ) : Source [ T , M ] = g match {
case s : Source [ T , M ] ⇒ s
case s : javadsl.Source [ T , M ] ⇒ s . asScala
2016-07-27 13:29:23 +02:00
case other ⇒ new Source (
LinearTraversalBuilder . fromBuilder ( other . traversalBuilder , other . shape , Keep . right ) ,
2017-03-08 14:22:19 +01:00
other . shape )
2015-06-06 17:17:23 +02:00
}
2014-09-03 21:54:18 +02:00
/* *
2014-10-02 17:32:08 +02:00
* Helper to create [ [ Source ] ] from `Iterable` .
* Example usage : `Source(Seq(1,2,3))`
2014-09-03 21:54:18 +02:00
*
2014-10-02 17:32:08 +02:00
* Starts a new `Source` from the given `Iterable` . This is like starting from an
2014-09-03 21:54:18 +02:00
* Iterator , but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements ( always starting from the
* beginning ) regardless of when they subscribed .
*/
2016-01-20 10:00:37 +02:00
def apply [ T ] ( iterable : immutable.Iterable [ T ] ) : Source [ T , NotUsed ] =
2015-12-14 17:02:00 +01:00
single ( iterable ) . mapConcat ( ConstantFun . scalaIdentityFunction ) . withAttributes ( DefaultAttributes . iterableSource )
2015-01-28 14:19:50 +01:00
/* *
2016-09-04 16:11:49 +02:00
* Starts a new `Source` from the given `Future` . The stream will consist of
2015-01-28 14:19:50 +01:00
* one element when the `Future` is completed with a successful value , which
* may happen before or after materializing the `Flow` .
2015-03-05 12:21:17 +01:00
* The stream terminates with a failure if the `Future` is completed with a failure .
2015-01-28 14:19:50 +01:00
*/
2016-01-20 10:00:37 +02:00
def fromFuture [ T ] ( future : Future [ T ] ) : Source [ T , NotUsed ] =
2016-01-06 13:01:37 +01:00
fromGraph ( new FutureSource ( future ) )
2014-09-03 21:54:18 +02:00
2016-01-21 16:37:26 +01:00
/* *
2016-09-04 16:11:49 +02:00
* Starts a new `Source` from the given `Future` . The stream will consist of
2016-01-21 16:37:26 +01:00
* one element when the `Future` is completed with a successful value , which
* may happen before or after materializing the `Flow` .
* The stream terminates with a failure if the `Future` is completed with a failure .
*/
def fromCompletionStage [ T ] ( future : CompletionStage [ T ] ) : Source [ T , NotUsed ] =
fromGraph ( new FutureSource ( future . toScala ) )
2016-09-04 16:11:49 +02:00
/* *
* Streams the elements of the given future source once it successfully completes .
* If the future fails the stream is failed .
*/
def fromFutureSource [ T , M ] ( future : Future [ Graph [ SourceShape [ T ] , M ] ] ) : Source [ T , Future [ M ] ] = fromGraph ( new FutureFlattenSource ( future ) )
/* *
* Streams the elements of an asynchronous source once its given `completion` stage completes .
* If the `completion` fails the stream is failed with that exception .
*/
def fromSourceCompletionStage [ T , M ] ( completion : CompletionStage [ Graph [ SourceShape [ T ] , M ] ] ) : Source [ T , CompletionStage [ M ] ] = fromFutureSource ( completion . toScala ) . mapMaterializedValue ( _ . toJava )
2014-09-03 21:54:18 +02:00
/* *
2015-01-26 14:16:57 +01:00
* Elements are emitted periodically with the specified interval .
2014-09-03 21:54:18 +02:00
* The tick element will be delivered to downstream consumers that has requested any elements .
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later . It will
* receive new tick elements as soon as it has requested more elements .
*/
2015-11-04 11:43:11 +02:00
def tick [ T ] ( initialDelay : FiniteDuration , interval : FiniteDuration , tick : T ) : Source [ T , Cancellable ] =
2016-01-18 17:49:32 +01:00
fromGraph ( new TickSource [ T ] ( initialDelay , interval , tick ) )
2014-10-02 13:34:27 +02:00
2014-10-27 09:48:54 +02:00
/* *
* Create a `Source` with one element .
* Every connected `Sink` of this stream will see an individual stream consisting of one element .
*/
2016-01-20 10:00:37 +02:00
def single [ T ] ( element : T ) : Source [ T , NotUsed ] =
2016-01-18 17:49:32 +01:00
fromGraph ( new GraphStages . SingleSource ( element ) )
2014-10-27 09:48:54 +02:00
2015-02-26 12:36:46 +01:00
/* *
* Create a `Source` that will continually emit the given element .
*/
2016-01-20 10:00:37 +02:00
def repeat [ T ] ( element : T ) : Source [ T , NotUsed ] = {
2016-01-12 10:52:09 +01:00
val next = Some ( ( element , element ) )
2016-01-14 11:25:56 +01:00
unfold ( element ) ( _ ⇒ next ) . withAttributes ( DefaultAttributes . repeat )
2016-01-12 10:52:09 +01:00
}
2015-02-26 12:36:46 +01:00
2015-12-10 12:49:59 +02:00
/* *
2015-12-14 13:28:21 +02:00
* Create a `Source` that will unfold a value of type `S` into
2015-12-10 12:49:59 +02:00
* a pair of the next state `S` and output elements of type `E` .
*
2015-12-14 13:28:21 +02:00
* For example , all the Fibonacci numbers under 10 M :
2015-12-10 12:49:59 +02:00
*
* { { {
2015-12-14 13:28:21 +02:00
* Source . unfold ( 0 → 1 ) {
* case ( a , _ ) if a > 10000000 ⇒ None
* case ( a , b ) ⇒ Some ( ( b → ( a + b ) ) → a )
2015-12-10 12:49:59 +02:00
* }
* } } }
*/
2016-01-20 10:00:37 +02:00
def unfold [ S , E ] ( s : S ) ( f : S ⇒ Option [ ( S , E ) ] ) : Source [ E , NotUsed ] =
2016-01-11 17:15:44 +01:00
Source . fromGraph ( new Unfold ( s , f ) )
2015-12-10 12:49:59 +02:00
/* *
2015-12-14 13:28:21 +02:00
* Same as [ [ unfold ] ] , but uses an async function to generate the next state - element tuple .
2015-12-10 12:49:59 +02:00
*
* async fibonacci example :
*
* { { {
2015-12-14 13:28:21 +02:00
* Source . unfoldAsync ( 0 → 1 ) {
* case ( a , _ ) if a > 10000000 ⇒ Future . successful ( None )
* case ( a , b ) ⇒ Future {
2015-12-10 12:49:59 +02:00
* Thread . sleep ( 1000 )
* Some ( ( b → ( a + b ) ) → a )
* }
* }
* } } }
*/
2016-01-20 10:00:37 +02:00
def unfoldAsync [ S , E ] ( s : S ) ( f : S ⇒ Future [ Option [ ( S , E ) ] ] ) : Source [ E , NotUsed ] =
2016-01-11 17:15:44 +01:00
Source . fromGraph ( new UnfoldAsync ( s , f ) )
2015-12-10 12:49:59 +02:00
2014-10-27 09:48:54 +02:00
/* *
2014-10-30 17:04:36 +01:00
* A `Source` with no elements , i . e . an empty stream that is completed immediately for every connected `Sink` .
2014-10-27 09:48:54 +02:00
*/
2016-01-20 10:00:37 +02:00
def empty [ T ] : Source [ T , NotUsed ] = _empty
private [ this ] val _empty : Source [ Nothing , NotUsed ] =
2017-03-03 15:20:37 +01:00
Source . fromGraph ( EmptySource )
2014-10-27 09:48:54 +02:00
2015-01-29 10:21:54 +01:00
/* *
2015-10-21 22:45:39 +02:00
* Create a `Source` which materializes a [ [ scala . concurrent . Promise ] ] which controls what element
* will be emitted by the Source .
* If the materialized promise is completed with a Some , that value will be produced downstream ,
* followed by completion .
* If the materialized promise is completed with a None , no value will be produced downstream and completion will
* be signalled immediately .
* If the materialized promise is completed with a failure , then the returned source will terminate with that error .
* If the downstream of this source cancels before the promise has been completed , then the promise will be completed
* with None .
2015-01-29 10:21:54 +01:00
*/
2015-10-21 22:45:39 +02:00
def maybe [ T ] : Source [ T , Promise [ Option [ T ] ] ] =
2017-07-05 12:55:28 +01:00
Source . fromGraph ( MaybeSource . asInstanceOf [ Graph [ SourceShape [ T ] , Promise [ Option [ T ] ] ] ] )
2015-01-28 14:19:50 +01:00
/* *
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink` .
*/
2016-01-20 10:00:37 +02:00
def failed [ T ] ( cause : Throwable ) : Source [ T , NotUsed ] =
2017-07-05 12:55:28 +01:00
Source . fromGraph ( new FailedSource [ T ] ( cause ) )
2014-10-27 09:48:54 +02:00
2016-11-25 16:25:26 +01:00
/* *
* Creates a `Source` that is not materialized until there is downstream demand , when the source gets materialized
* the materialized future is completed with its value , if downstream cancels or fails without any demand the
* create factory is never called and the materialized `Future` is failed .
*/
2016-11-29 08:33:36 +01:00
def lazily [ T , M ] ( create : ( ) ⇒ Source [ T , M ] ) : Source [ T , Future [ M ] ] =
2016-11-25 16:25:26 +01:00
Source . fromGraph ( new LazySource [ T , M ] ( create ) )
2014-10-17 14:05:50 +02:00
/* *
* Creates a `Source` that is materialized as a [ [ org . reactivestreams . Subscriber ] ]
*/
2015-12-17 11:48:30 +02:00
def asSubscriber [ T ] : Source [ T , Subscriber [ T ] ] =
2016-07-27 13:29:23 +02:00
fromGraph ( new SubscriberSource [ T ] ( DefaultAttributes . subscriberSource , shape ( "SubscriberSource" ) ) )
2015-01-28 14:19:50 +01:00
2015-03-31 15:13:57 +02:00
/* *
* Creates a `Source` that is materialized to an [ [ akka . actor . ActorRef ] ] which points to an Actor
2015-10-09 15:11:01 -04:00
* created according to the passed in [ [ akka . actor . Props ] ] . Actor created by the `props` must
2015-03-31 15:13:57 +02:00
* be [ [ akka . stream . actor . ActorPublisher ] ] .
2016-12-09 14:08:13 +01:00
*
2016-12-08 17:22:01 +01:00
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead , it allows for all operations an Actor would and is more type - safe as well as guaranteed to be ReactiveStreams compliant .
2015-03-31 15:13:57 +02:00
*/
2016-12-08 17:22:01 +01:00
@deprecated ( "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant." , since = "2.5.0" )
2015-10-09 15:11:01 -04:00
def actorPublisher [ T ] ( props : Props ) : Source [ T , ActorRef ] = {
require ( classOf [ ActorPublisher [ _ ] ] . isAssignableFrom ( props . actorClass ( ) ) , "Actor must be ActorPublisher" )
2016-07-27 13:29:23 +02:00
fromGraph ( new ActorPublisherSource ( props , DefaultAttributes . actorPublisherSource , shape ( "ActorPublisherSource" ) ) )
2015-10-09 15:11:01 -04:00
}
2015-03-31 15:13:57 +02:00
/* *
* Creates a `Source` that is materialized as an [ [ akka . actor . ActorRef ] ] .
* Messages sent to this actor will be emitted to the stream if there is demand from downstream ,
* otherwise they will be buffered until request for demand is received .
*
* Depending on the defined [ [ akka . stream . OverflowStrategy ] ] it might drop elements if
* there is no space available in the buffer .
*
2015-07-09 10:18:18 +02:00
* The strategy [ [ akka . stream . OverflowStrategy . backpressure ] ] is not supported , and an
* IllegalArgument ( "Backpressure overflowStrategy not supported" ) will be thrown if it is passed as argument .
*
2016-05-01 19:58:49 -06:00
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream . When `bufferSize` is 0 the `overflowStrategy` does not matter . An async boundary is added after
* this Source ; as such , it is never safe to assume the downstream will always generate demand .
2015-03-31 15:13:57 +02:00
*
2016-04-15 16:29:53 +02:00
* The stream can be completed successfully by sending the actor reference a [ [ akka . actor . Status . Success ] ]
* ( whose content will be ignored ) in which case already buffered elements will be signaled before signaling
* completion , or by sending [ [ akka . actor . PoisonPill ] ] in which case completion will be signaled immediately .
2015-03-31 15:13:57 +02:00
*
2016-04-15 16:29:53 +02:00
* The stream can be completed with failure by sending a [ [ akka . actor . Status . Failure ] ] to the
2015-04-28 09:48:46 +02:00
* actor reference . In case the Actor is still draining its internal buffer ( after having received
2016-04-15 16:29:53 +02:00
* a [ [ akka . actor . Status . Success ] ] ) before signaling completion and it receives a [ [ akka . actor . Status . Failure ] ] ,
2015-09-28 22:23:59 -07:00
* the failure will be signaled downstream immediately ( instead of the completion signal ) .
2015-03-31 15:13:57 +02:00
*
2015-09-28 22:23:59 -07:00
* The actor will be stopped when the stream is completed , failed or canceled from downstream ,
2015-03-31 15:13:57 +02:00
* i . e . you can watch it to get notified when that happens .
*
2016-10-26 10:24:51 +02:00
* See also [ [ akka . stream . scaladsl . Source . queue ] ] .
2016-05-01 19:58:49 -06:00
*
2015-03-31 15:13:57 +02:00
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef [ T ] ( bufferSize : Int , overflowStrategy : OverflowStrategy ) : Source [ T , ActorRef ] = {
require ( bufferSize >= 0 , "bufferSize must be greater than or equal to 0" )
2016-01-16 12:17:19 -05:00
require ( overflowStrategy != OverflowStrategies . Backpressure , "Backpressure overflowStrategy not supported" )
2016-07-27 13:29:23 +02:00
fromGraph ( new ActorRefSource ( bufferSize , overflowStrategy , DefaultAttributes . actorRefSource , shape ( "ActorRefSource" ) ) )
2015-03-31 15:13:57 +02:00
}
2015-06-29 23:47:31 -04:00
/* *
* Combines several sources with fun - in strategy like `Merge` or `Concat` and returns `Source` .
*/
2016-01-20 10:00:37 +02:00
def combine [ T , U ] ( first : Source [ T , _ ] , second : Source [ T , _ ] , rest : Source [ T , _ ] * ) ( strategy : Int ⇒ Graph [ UniformFanInShape [ T , U ] , NotUsed ] ) : Source [ U , NotUsed ] =
2015-11-30 15:45:37 +01:00
Source . fromGraph ( GraphDSL . create ( ) { implicit b ⇒
import GraphDSL.Implicits._
2015-06-29 23:47:31 -04:00
val c = b . add ( strategy ( rest . size + 2 ) )
first ~> c . in ( 0 )
second ~> c . in ( 1 )
@tailrec def combineRest ( idx : Int , i : Iterator [ Source [ T , _ ] ] ) : SourceShape [ U ] =
if ( i . hasNext ) {
i . next ( ) ~> c . in ( idx )
combineRest ( idx + 1 , i )
} else SourceShape ( c . out )
combineRest ( 2 , rest . iterator )
} )
2016-04-22 12:04:28 +02:00
/* *
* Combine the elements of multiple streams into a stream of sequences .
*/
def zipN [ T ] ( sources : immutable.Seq [ Source [ T , _ ] ] ) : Source [ immutable . Seq [ T ] , NotUsed ] = zipWithN ( ConstantFun . scalaIdentityFunction [ immutable . Seq [ T ] ] ) ( sources ) . addAttributes ( DefaultAttributes . zipN )
/*
* Combine the elements of multiple streams into a stream of sequences using a combiner function .
*/
def zipWithN [ T , O ] ( zipper : immutable.Seq [ T ] ⇒ O ) ( sources : immutable.Seq [ Source [ T , _ ] ] ) : Source [ O , NotUsed ] = {
val source = sources match {
case immutable . Seq ( ) ⇒ empty [ O ]
case immutable . Seq ( source ) ⇒ source . map ( t ⇒ zipper ( immutable . Seq ( t ) ) ) . mapMaterializedValue ( _ ⇒ NotUsed )
case s1 +: s2 +: ss ⇒ combine ( s1 , s2 , ss : _ * ) ( ZipWithN ( zipper ) )
}
source . addAttributes ( DefaultAttributes . zipWithN )
}
2015-08-19 23:04:20 -04:00
/* *
2016-08-11 07:37:54 -05:00
* Creates a `Source` that is materialized as an [ [ akka . stream . scaladsl . SourceQueue ] ] .
2015-08-19 23:04:20 -04:00
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream ,
2016-01-16 12:17:19 -05:00
* otherwise they will be buffered until request for demand is received . Elements in the buffer will be discarded
* if downstream is terminated .
2015-08-19 23:04:20 -04:00
*
* Depending on the defined [ [ akka . stream . OverflowStrategy ] ] it might drop elements if
* there is no space available in the buffer .
*
* Acknowledgement mechanism is available .
2016-10-26 10:24:51 +02:00
* [ [ akka . stream . scaladsl . SourceQueue . offer ] ] returns `Future[QueueOfferResult]` which completes with
* `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream . It completes with
* `QueueOfferResult.Dropped` if element was dropped . Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed .
2015-08-19 23:04:20 -04:00
*
2016-01-16 12:17:19 -05:00
* The strategy [ [ akka . stream . OverflowStrategy . backpressure ] ] will not complete last `offer():Future`
* call when buffer is full .
2015-08-19 23:04:20 -04:00
*
2016-08-11 07:37:54 -05:00
* You can watch accessibility of stream with [ [ akka . stream . scaladsl . SourceQueue . watchCompletion ] ] .
2016-01-16 12:17:19 -05:00
* It returns future that completes with success when stream is completed or fail when stream is failed .
2015-08-19 23:04:20 -04:00
*
2016-11-22 16:11:09 +03:00
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
* for downstream demand unless there is another message waiting for downstream demand , in that case
* offer result will be completed according to the overflow strategy .
2016-01-16 12:17:19 -05:00
*
* @param bufferSize size of buffer in element count
2015-08-19 23:04:20 -04:00
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
2016-02-25 16:05:35 +01:00
def queue [ T ] ( bufferSize : Int , overflowStrategy : OverflowStrategy ) : Source [ T , SourceQueueWithComplete [ T ] ] =
2016-01-16 12:17:19 -05:00
Source . fromGraph ( new QueueSource ( bufferSize , overflowStrategy ) . withAttributes ( DefaultAttributes . queueSource ) )
2015-08-19 23:04:20 -04:00
2016-02-22 23:22:47 -05:00
/* *
* Start a new `Source` from some resource which can be opened , read and closed .
* Interaction with resource happens in a blocking way .
*
* Example :
* { { {
* Source . unfoldResource (
* ( ) => new BufferedReader ( new FileReader ( "..." ) ) ,
* reader => Option ( reader . readLine ( ) ) ,
* reader => reader . close ( ) )
* } } }
*
* You can use the supervision strategy to handle exceptions for `read` function . All exceptions thrown by `create`
* or `close` will fail the stream .
*
* `Restart` supervision strategy will close and create blocking IO again . Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function by default .
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [ [ ActorAttributes ] ] .
*
* @param create - function that is called on stream start and creates / opens resource .
* @param read - function that reads data from opened resource . It is called each time backpressure signal
* is received . Stream calls close and completes when `read` returns None .
* @param close - function that closes resource
*/
def unfoldResource [ T , S ] ( create : ( ) ⇒ S , read : ( S ) ⇒ Option [ T ] , close : ( S ) ⇒ Unit ) : Source [ T , NotUsed ] =
Source . fromGraph ( new UnfoldResourceSource ( create , read , close ) )
/* *
* Start a new `Source` from some resource which can be opened , read and closed .
* It 's similar to `unfoldResource` but takes functions that return `Futures` instead of plain values .
*
* You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures` .
* All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream .
*
* `Restart` supervision strategy will close and create resource . Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function ( or future ) by default .
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [ [ ActorAttributes ] ] .
*
* @param create - function that is called on stream start and creates / opens resource .
* @param read - function that reads data from opened resource . It is called each time backpressure signal
* is received . Stream calls close and completes when `Future` from read function returns None .
* @param close - function that closes resource
*/
def unfoldResourceAsync [ T , S ] ( create : ( ) ⇒ Future [ S ] , read : ( S ) ⇒ Future [ Option [ T ] ] , close : ( S ) ⇒ Future [ Done ] ) : Source [ T , NotUsed ] =
Source . fromGraph ( new UnfoldResourceSourceAsync ( create , read , close ) )
2015-01-28 14:19:50 +01:00
}