2018-10-29 17:19:37 +08:00
/*
2019-01-02 18:55:26 +08:00
* Copyright ( C ) 2014 - 2019 Lightbend Inc . < https : //www.lightbend.com>
2014-09-03 21:54:18 +02:00
*/
2018-03-13 23:45:55 +09:00
2014-10-27 14:35:41 +01:00
package akka.stream.scaladsl
2014-09-03 21:54:18 +02:00
2017-11-22 13:51:24 +01:00
import java.util.concurrent.CompletionStage
2018-01-22 01:16:36 +09:00
2015-04-16 02:24:01 +02:00
import akka.actor. { ActorRef , Cancellable , Props }
2018-11-16 15:00:30 +01:00
import akka.annotation. { ApiMayChange , InternalApi }
2015-10-09 15:11:01 -04:00
import akka.stream.actor.ActorPublisher
2017-11-22 13:51:24 +01:00
import akka.stream.impl.Stages.DefaultAttributes
2015-12-14 17:02:00 +01:00
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages._
2017-11-22 13:51:24 +01:00
import akka.stream.impl. { PublisherSource , _ }
2015-06-29 23:47:31 -04:00
import akka.stream. { Outlet , SourceShape , _ }
2017-11-22 13:51:24 +01:00
import akka.util.ConstantFun
import akka. { Done , NotUsed }
2015-07-06 22:00:21 +02:00
import org.reactivestreams. { Publisher , Subscriber }
2015-06-29 23:47:31 -04:00
import scala.annotation.tailrec
2015-12-14 17:02:00 +01:00
import scala.annotation.unchecked.uncheckedVariance
2014-09-03 21:54:18 +02:00
import scala.collection.immutable
2016-08-09 21:08:31 -05:00
import scala.concurrent.duration.FiniteDuration
2015-04-16 02:24:01 +02:00
import scala.concurrent. { Future , Promise }
2018-01-04 17:21:47 +01:00
2018-07-11 18:19:40 +02:00
import akka.stream.stage.GraphStageWithMaterializedValue
2018-01-04 17:21:47 +01:00
import scala.compat.java8.FutureConverters._
2014-10-02 17:32:08 +02:00
/* *
2015-01-28 14:19:50 +01:00
* A `Source` is a set of stream processing steps that has one open output . It can comprise
* any number of internal sources and transformations that are wired together , or it can be
* an “ atomic ” source , e . g . from a collection or a file . Materialization turns a Source into
* a Reactive Streams `Publisher` ( at least conceptually ) .
2014-10-02 17:32:08 +02:00
*/
2019-03-13 10:56:20 +01:00
final class Source [ + Out , + Mat ] (
override val traversalBuilder : LinearTraversalBuilder ,
override val shape : SourceShape [ Out ] )
2019-03-11 10:38:24 +01:00
extends FlowOpsMat [ Out , Mat ]
with Graph [ SourceShape [ Out ] , Mat ] {
2015-01-28 14:19:50 +01:00
2015-11-25 19:58:48 +01:00
override type Repr [ + O ] = Source [ O , Mat @ uncheckedVariance ]
override type ReprMat [ + O , + M ] = Source [ O , M ]
override type Closed = RunnableGraph [ Mat @ uncheckedVariance ]
override type ClosedMat [ + M ] = RunnableGraph [ M ]
2015-01-28 14:19:50 +01:00
2016-07-27 13:29:23 +02:00
override def toString : String = s" Source( $shape ) "
2016-03-11 17:08:30 +01:00
2015-11-25 19:58:48 +01:00
override def via [ T , Mat2 ] ( flow : Graph [ FlowShape [ Out , T ] , Mat2 ] ) : Repr [ T ] = viaMat ( flow ) ( Keep . left )
2019-03-11 10:38:24 +01:00
override def viaMat [ T , Mat2 , Mat3 ] ( flow : Graph [ FlowShape [ Out , T ] , Mat2 ] ) (
combine : ( Mat , Mat2 ) => Mat3 ) : Source [ T , Mat3 ] = {
2017-11-13 17:21:46 +09:00
if ( flow . traversalBuilder eq Flow . identityTraversalBuilder )
if ( combine == Keep . left )
//optimization by returning this
this . asInstanceOf [ Source [ T , Mat3 ] ] //Mat == Mat3, due to Keep.left
else if ( combine == Keep . right || combine == Keep . none ) // Mat3 = NotUsed
//optimization with LinearTraversalBuilder.empty()
2019-03-13 10:56:20 +01:00
new Source [ T , Mat3 ] (
traversalBuilder . append ( LinearTraversalBuilder . empty ( ) , flow . shape , combine ) ,
SourceShape ( shape . out ) . asInstanceOf [ SourceShape [ T ] ] )
2017-03-02 13:44:35 +01:00
else
2019-03-13 10:56:20 +01:00
new Source [ T , Mat3 ] (
traversalBuilder . append ( flow . traversalBuilder , flow . shape , combine ) ,
SourceShape ( flow . shape . out ) )
2017-11-13 17:21:46 +09:00
else
2019-03-13 10:56:20 +01:00
new Source [ T , Mat3 ] (
traversalBuilder . append ( flow . traversalBuilder , flow . shape , combine ) ,
SourceShape ( flow . shape . out ) )
2015-01-28 14:19:50 +01:00
}
2014-10-02 17:32:08 +02:00
/* *
2014-11-06 14:03:01 +01:00
* Connect this [ [ akka . stream . scaladsl . Source ] ] to a [ [ akka . stream . scaladsl . Sink ] ] ,
* concatenating the processing steps of both .
2014-10-02 17:32:08 +02:00
*/
2015-06-23 18:41:55 +02:00
def to [ Mat2 ] ( sink : Graph [ SinkShape [ Out ] , Mat2 ] ) : RunnableGraph [ Mat ] = toMat ( sink ) ( Keep . left )
2015-01-28 14:19:50 +01:00
/* *
* Connect this [ [ akka . stream . scaladsl . Source ] ] to a [ [ akka . stream . scaladsl . Sink ] ] ,
* concatenating the processing steps of both .
*/
2019-02-09 15:25:39 +01:00
def toMat [ Mat2 , Mat3 ] ( sink : Graph [ SinkShape [ Out ] , Mat2 ] ) ( combine : ( Mat , Mat2 ) => Mat3 ) : RunnableGraph [ Mat3 ] = {
2016-07-27 13:29:23 +02:00
RunnableGraph ( traversalBuilder . append ( sink . traversalBuilder , sink . shape , combine ) )
2015-01-28 14:19:50 +01:00
}
/* *
* Transform only the materialized value of this Source , leaving all other properties as they were .
*/
2019-02-09 15:25:39 +01:00
override def mapMaterializedValue [ Mat2 ] ( f : Mat => Mat2 ) : ReprMat [ Out , Mat2 ] =
new Source [ Out , Mat2 ] ( traversalBuilder . transformMat ( f . asInstanceOf [ Any => Any ] ) , shape )
2015-01-28 14:19:50 +01:00
2018-02-21 06:06:01 +00:00
/* *
* Materializes this Source , immediately returning ( 1 ) its materialized value , and ( 2 ) a new Source
* that can be used to consume elements from the newly materialized Source .
*/
def preMaterialize ( ) ( implicit materializer : Materializer ) : ( Mat , ReprMat [ Out , NotUsed ] ) = {
val ( mat , pub ) = toMat ( Sink . asPublisher ( fanout = true ) ) ( Keep . both ) . run ( )
( mat , Source . fromPublisher ( pub ) )
}
2014-10-02 13:34:27 +02:00
/* *
2014-10-17 14:05:50 +02:00
* Connect this `Source` to a `Sink` and run it . The returned value is the materialized value
2014-11-06 14:03:01 +01:00
* of the `Sink` , e . g . the `Publisher` of a [ [ akka . stream . scaladsl . Sink # publisher ] ] .
2014-10-02 13:34:27 +02:00
*/
2019-03-11 10:38:24 +01:00
def runWith [ Mat2 ] ( sink : Graph [ SinkShape [ Out ] , Mat2 ] ) ( implicit materializer : Materializer ) : Mat2 =
toMat ( sink ) ( Keep . right ) . run ( )
2014-09-03 21:54:18 +02:00
2014-10-06 14:46:52 +02:00
/* *
* Shortcut for running this `Source` with a fold function .
* The given function is invoked for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
2015-01-30 10:30:56 +01:00
* if there is a failure signaled in the stream .
2014-10-06 14:46:52 +02:00
*/
2019-03-11 10:38:24 +01:00
def runFold [ U ] ( zero : U ) ( f : ( U , Out ) => U ) ( implicit materializer : Materializer ) : Future [ U ] =
runWith ( Sink . fold ( zero ) ( f ) )
2016-08-24 21:02:32 +02:00
/* *
* Shortcut for running this `Source` with a foldAsync function .
* The given function is invoked for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
*/
2019-03-11 10:38:24 +01:00
def runFoldAsync [ U ] ( zero : U ) ( f : ( U , Out ) => Future [ U ] ) ( implicit materializer : Materializer ) : Future [ U ] =
runWith ( Sink . foldAsync ( zero ) ( f ) )
2014-10-06 14:46:52 +02:00
2016-01-15 22:51:26 -05:00
/* *
* Shortcut for running this `Source` with a reduce function .
* The given function is invoked for every received element , giving it its previous
* output ( from the second element ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
2016-04-11 15:36:10 +02:00
*
* If the stream is empty ( i . e . completes before signalling any elements ) ,
2018-06-09 17:42:56 +09:00
* the reduce operator will fail its downstream with a [ [ NoSuchElementException ] ] ,
2016-04-11 15:36:10 +02:00
* which is semantically in - line with that Scala 's standard library collections
* do in such situations .
2016-01-15 22:51:26 -05:00
*/
2019-02-09 15:25:39 +01:00
def runReduce [ U >: Out ] ( f : ( U , U ) => U ) ( implicit materializer : Materializer ) : Future [ U ] =
2016-01-15 22:51:26 -05:00
runWith ( Sink . reduce ( f ) )
2014-10-06 14:46:52 +02:00
/* *
* Shortcut for running this `Source` with a foreach procedure . The given procedure is invoked
* for each received element .
* The returned [ [ scala . concurrent . Future ] ] will be completed with `Success` when reaching the
2015-01-30 10:30:56 +01:00
* normal end of the stream , or completed with `Failure` if there is a failure signaled in
2014-10-06 14:46:52 +02:00
* the stream .
*/
2016-01-20 10:00:37 +02:00
// FIXME: Out => Unit should stay, right??
2019-02-09 15:25:39 +01:00
def runForeach ( f : Out => Unit ) ( implicit materializer : Materializer ) : Future [ Done ] = runWith ( Sink . foreach ( f ) )
2014-10-06 14:46:52 +02:00
2015-07-06 22:00:21 +02:00
/* *
2017-11-23 10:26:00 +01:00
* Replace the attributes of this [ [ Source ] ] with the given ones . If this Source is a composite
* of multiple graphs , new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite .
2015-07-06 22:00:21 +02:00
*/
2015-11-25 19:58:48 +01:00
override def withAttributes ( attr : Attributes ) : Repr [ Out ] =
2016-07-27 13:29:23 +02:00
new Source ( traversalBuilder . setAttributes ( attr ) , shape )
2014-12-01 20:07:55 +02:00
2015-12-22 20:56:02 +01:00
/* *
2017-11-23 10:26:00 +01:00
* Add the given attributes to this Source . If the specific attribute was already on this source
* it will replace the previous value . If this Source is a composite
* of multiple graphs , the added attributes will be on the composite and therefore less specific than attributes
* set directly on the individual graphs of the composite .
2015-12-22 20:56:02 +01:00
*/
2016-07-27 13:29:23 +02:00
override def addAttributes ( attr : Attributes ) : Repr [ Out ] = withAttributes ( traversalBuilder . attributes and attr )
2015-12-22 20:56:02 +01:00
/* *
2016-08-11 07:37:54 -05:00
* Add a ` `name` ` attribute to this Source .
2015-12-22 20:56:02 +01:00
*/
2016-02-10 13:56:38 +01:00
override def named ( name : String ) : Repr [ Out ] = addAttributes ( Attributes . name ( name ) )
/* *
* Put an asynchronous boundary around this `Source`
*/
2017-11-23 10:26:00 +01:00
override def async : Repr [ Out ] = super . async . asInstanceOf [ Repr [ Out ] ]
/* *
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async ( dispatcher : String ) : Repr [ Out ] =
super . async ( dispatcher ) . asInstanceOf [ Repr [ Out ] ]
/* *
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async ( dispatcher : String , inputBufferSize : Int ) : Repr [ Out ] =
super . async ( dispatcher , inputBufferSize ) . asInstanceOf [ Repr [ Out ] ]
2015-04-14 08:59:37 +02:00
2016-08-11 07:37:54 -05:00
/* *
* Converts this Scala DSL element to it 's Java DSL counterpart .
*/
2019-02-07 16:41:49 +02:00
def asJava : javadsl.Source [ Out @ uncheckedVariance , Mat @ uncheckedVariance ] = new javadsl . Source ( this )
2015-06-29 23:47:31 -04:00
/* *
2017-09-18 09:48:57 +03:00
* Combines several sources with fan - in strategy like `Merge` or `Concat` and returns `Source` .
2015-06-29 23:47:31 -04:00
*/
2017-09-18 09:48:57 +03:00
@deprecated ( "Use `Source.combine` on companion object instead" , "2.5.5" )
2019-03-11 10:38:24 +01:00
def combine [ T , U ] ( first : Source [ T , _ ] , second : Source [ T , _ ] , rest : Source [ T , _ ] * ) (
strategy : Int => Graph [ UniformFanInShape [ T , U ] , NotUsed ] ) : Source [ U , NotUsed ] =
2019-02-09 15:25:39 +01:00
Source . fromGraph ( GraphDSL . create ( ) { implicit b =>
2015-11-30 15:45:37 +01:00
import GraphDSL.Implicits._
2015-06-29 23:47:31 -04:00
val c = b . add ( strategy ( rest . size + 2 ) )
first ~> c . in ( 0 )
second ~> c . in ( 1 )
@tailrec def combineRest ( idx : Int , i : Iterator [ Source [ T , _ ] ] ) : SourceShape [ U ] =
if ( i . hasNext ) {
i . next ( ) ~> c . in ( idx )
combineRest ( idx + 1 , i )
} else SourceShape ( c . out )
combineRest ( 2 , rest . iterator )
} )
2019-01-15 16:53:02 +01:00
2018-11-16 15:00:30 +01:00
/* *
* API MAY CHANGE
*/
@ApiMayChange
2019-03-11 10:38:24 +01:00
def asSourceWithContext [ Ctx ] ( f : Out => Ctx ) : SourceWithContext [ Out , Ctx , Mat ] =
new SourceWithContext ( this . map ( e => ( e , f ( e ) ) ) )
2014-10-02 17:32:08 +02:00
}
2015-10-21 22:45:39 +02:00
object Source {
2019-03-11 10:38:24 +01:00
2015-04-16 02:24:01 +02:00
/* * INTERNAL API */
2016-05-03 18:58:26 -07:00
def shape [ T ] ( name : String ) : SourceShape [ T ] = SourceShape ( Outlet ( name + ".out" ) )
2015-01-28 14:19:50 +01:00
/* *
* Helper to create [ [ Source ] ] from `Publisher` .
*
* Construct a transformation starting with given publisher . The transformation steps
* are executed by a series of [ [ org . reactivestreams . Processor ] ] instances
* that mediate the flow of elements downstream and the propagation of
* back - pressure upstream .
*/
2016-01-20 10:00:37 +02:00
def fromPublisher [ T ] ( publisher : Publisher [ T ] ) : Source [ T , NotUsed ] =
2016-07-27 13:29:23 +02:00
fromGraph ( new PublisherSource ( publisher , DefaultAttributes . publisherSource , shape ( "PublisherSource" ) ) )
2015-01-28 14:19:50 +01:00
2014-09-03 21:54:18 +02:00
/* *
2014-10-02 17:32:08 +02:00
* Helper to create [ [ Source ] ] from `Iterator` .
2015-12-17 11:48:30 +02:00
* Example usage : `Source.fromIterator(() => Iterator.from(0))`
2014-09-03 21:54:18 +02:00
*
2014-11-09 21:09:50 +01:00
* Start a new `Source` from the given function that produces anIterator .
* The produced stream of elements will continue until the iterator runs empty
* or fails during evaluation of the `next()` method .
* Elements are pulled out of the iterator in accordance with the demand coming
* from the downstream transformation steps .
2014-09-03 21:54:18 +02:00
*/
2019-02-09 15:25:39 +01:00
def fromIterator [ T ] ( f : ( ) => Iterator [ T ] ) : Source [ T , NotUsed ] =
2015-07-06 22:00:21 +02:00
apply ( new immutable . Iterable [ T ] {
override def iterator : Iterator [ T ] = f ( )
override def toString : String = "() => Iterator"
} )
2015-01-28 14:19:50 +01:00
2016-03-22 21:59:52 -05:00
/* *
2016-09-04 16:11:49 +02:00
* Creates [ [ Source ] ] that will continually produce given elements in specified order .
2016-03-22 21:59:52 -05:00
*
2016-09-04 16:11:49 +02:00
* Starts a new 'cycled' `Source` from the given elements . The producer stream of elements
2016-03-22 21:59:52 -05:00
* will continue infinitely by repeating the sequence of elements provided by function parameter .
*/
2019-02-09 15:25:39 +01:00
def cycle [ T ] ( f : ( ) => Iterator [ T ] ) : Source [ T , NotUsed ] = {
2019-03-11 10:38:24 +01:00
val iterator = Iterator . continually {
val i = f ( ) ; if ( i . isEmpty ) throw new IllegalArgumentException ( "empty iterator" ) else i
} . flatten
2019-02-09 15:25:39 +01:00
fromIterator ( ( ) => iterator ) . withAttributes ( DefaultAttributes . cycledSource )
2016-03-22 21:59:52 -05:00
}
2015-01-28 14:19:50 +01:00
/* *
* A graph with the shape of a source logically is a source , this method makes
* it so also in type .
*/
2015-10-21 22:45:39 +02:00
def fromGraph [ T , M ] ( g : Graph [ SourceShape [ T ] , M ] ) : Source [ T , M ] = g match {
2019-03-11 10:38:24 +01:00
case s : Source [ T , M ] => s
case s : javadsl.Source [ T , M ] => s . asScala
2019-02-09 15:25:39 +01:00
case g : GraphStageWithMaterializedValue [ SourceShape [ T ] , M ] =>
2017-11-23 10:26:00 +01:00
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g . traversalBuilder . attributes
val noAttrStage = g . withAttributes ( Attributes . none )
2019-03-13 10:56:20 +01:00
new Source (
LinearTraversalBuilder . fromBuilder ( noAttrStage . traversalBuilder , noAttrStage . shape , Keep . right ) ,
noAttrStage . shape ) . withAttributes ( attrs )
2019-02-09 15:25:39 +01:00
case other =>
2017-11-23 10:26:00 +01:00
// composite source shaped graph
2019-03-11 10:38:24 +01:00
new Source ( LinearTraversalBuilder . fromBuilder ( other . traversalBuilder , other . shape , Keep . right ) , other . shape )
2015-06-06 17:17:23 +02:00
}
2014-09-03 21:54:18 +02:00
2019-05-17 09:54:18 +03:00
/* *
* Defers the creation of a [ [ Source ] ] until materialization . The `factory` function
* exposes [ [ ActorMaterializer ] ] which is going to be used during materialization and
* [ [ Attributes ] ] of the [ [ Source ] ] returned by this method .
*/
def setup [ T , M ] ( factory : ( ActorMaterializer , Attributes ) ⇒ Source [ T , M ] ) : Source [ T , Future [ M ] ] =
Source . fromGraph ( new SetupSourceStage ( factory ) )
2014-09-03 21:54:18 +02:00
/* *
2014-10-02 17:32:08 +02:00
* Helper to create [ [ Source ] ] from `Iterable` .
* Example usage : `Source(Seq(1,2,3))`
2014-09-03 21:54:18 +02:00
*
2014-10-02 17:32:08 +02:00
* Starts a new `Source` from the given `Iterable` . This is like starting from an
2014-09-03 21:54:18 +02:00
* Iterator , but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements ( always starting from the
* beginning ) regardless of when they subscribed .
*/
2016-01-20 10:00:37 +02:00
def apply [ T ] ( iterable : immutable.Iterable [ T ] ) : Source [ T , NotUsed ] =
2015-12-14 17:02:00 +01:00
single ( iterable ) . mapConcat ( ConstantFun . scalaIdentityFunction ) . withAttributes ( DefaultAttributes . iterableSource )
2015-01-28 14:19:50 +01:00
/* *
2016-09-04 16:11:49 +02:00
* Starts a new `Source` from the given `Future` . The stream will consist of
2015-01-28 14:19:50 +01:00
* one element when the `Future` is completed with a successful value , which
* may happen before or after materializing the `Flow` .
2015-03-05 12:21:17 +01:00
* The stream terminates with a failure if the `Future` is completed with a failure .
2015-01-28 14:19:50 +01:00
*/
2016-01-20 10:00:37 +02:00
def fromFuture [ T ] ( future : Future [ T ] ) : Source [ T , NotUsed ] =
2016-01-06 13:01:37 +01:00
fromGraph ( new FutureSource ( future ) )
2014-09-03 21:54:18 +02:00
2016-01-21 16:37:26 +01:00
/* *
2016-09-04 16:11:49 +02:00
* Starts a new `Source` from the given `Future` . The stream will consist of
2016-01-21 16:37:26 +01:00
* one element when the `Future` is completed with a successful value , which
* may happen before or after materializing the `Flow` .
* The stream terminates with a failure if the `Future` is completed with a failure .
*/
def fromCompletionStage [ T ] ( future : CompletionStage [ T ] ) : Source [ T , NotUsed ] =
fromGraph ( new FutureSource ( future . toScala ) )
2016-09-04 16:11:49 +02:00
/* *
* Streams the elements of the given future source once it successfully completes .
2017-11-17 12:24:12 +01:00
* If the [ [ Future ] ] fails the stream is failed with the exception from the future . If downstream cancels before the
* stream completes the materialized `Future` will be failed with a [ [ StreamDetachedException ] ]
2016-09-04 16:11:49 +02:00
*/
2019-03-11 10:38:24 +01:00
def fromFutureSource [ T , M ] ( future : Future [ Graph [ SourceShape [ T ] , M ] ] ) : Source [ T , Future [ M ] ] =
fromGraph ( new FutureFlattenSource ( future ) )
2016-09-04 16:11:49 +02:00
/* *
2018-06-09 17:42:56 +09:00
* Streams the elements of an asynchronous source once its given `completion` operator completes .
2017-11-17 12:24:12 +01:00
* If the [ [ CompletionStage ] ] fails the stream is failed with the exception from the future .
* If downstream cancels before the stream completes the materialized `Future` will be failed
* with a [ [ StreamDetachedException ] ]
2016-09-04 16:11:49 +02:00
*/
2019-03-11 10:38:24 +01:00
def fromSourceCompletionStage [ T , M ] (
completion : CompletionStage [ _ <: Graph [ SourceShape [ T ] , M ] ] ) : Source [ T , CompletionStage [ M ] ] =
fromFutureSource ( completion . toScala ) . mapMaterializedValue ( _ . toJava )
2016-09-04 16:11:49 +02:00
2014-09-03 21:54:18 +02:00
/* *
2015-01-26 14:16:57 +01:00
* Elements are emitted periodically with the specified interval .
2014-09-03 21:54:18 +02:00
* The tick element will be delivered to downstream consumers that has requested any elements .
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later . It will
* receive new tick elements as soon as it has requested more elements .
*/
2015-11-04 11:43:11 +02:00
def tick [ T ] ( initialDelay : FiniteDuration , interval : FiniteDuration , tick : T ) : Source [ T , Cancellable ] =
2016-01-18 17:49:32 +01:00
fromGraph ( new TickSource [ T ] ( initialDelay , interval , tick ) )
2014-10-02 13:34:27 +02:00
2014-10-27 09:48:54 +02:00
/* *
* Create a `Source` with one element .
* Every connected `Sink` of this stream will see an individual stream consisting of one element .
*/
2016-01-20 10:00:37 +02:00
def single [ T ] ( element : T ) : Source [ T , NotUsed ] =
2016-01-18 17:49:32 +01:00
fromGraph ( new GraphStages . SingleSource ( element ) )
2014-10-27 09:48:54 +02:00
2015-02-26 12:36:46 +01:00
/* *
* Create a `Source` that will continually emit the given element .
*/
2016-01-20 10:00:37 +02:00
def repeat [ T ] ( element : T ) : Source [ T , NotUsed ] = {
2016-01-12 10:52:09 +01:00
val next = Some ( ( element , element ) )
2019-02-09 15:25:39 +01:00
unfold ( element ) ( _ => next ) . withAttributes ( DefaultAttributes . repeat )
2016-01-12 10:52:09 +01:00
}
2015-02-26 12:36:46 +01:00
2015-12-10 12:49:59 +02:00
/* *
2015-12-14 13:28:21 +02:00
* Create a `Source` that will unfold a value of type `S` into
2015-12-10 12:49:59 +02:00
* a pair of the next state `S` and output elements of type `E` .
*
2015-12-14 13:28:21 +02:00
* For example , all the Fibonacci numbers under 10 M :
2015-12-10 12:49:59 +02:00
*
* { { {
2019-02-09 15:25:39 +01:00
* Source . unfold ( 0 -> 1 ) {
* case ( a , _ ) if a > 10000000 => None
* case ( a , b ) => Some ( ( b -> ( a + b ) ) -> a )
2015-12-10 12:49:59 +02:00
* }
* } } }
*/
2019-02-09 15:25:39 +01:00
def unfold [ S , E ] ( s : S ) ( f : S => Option [ ( S , E ) ] ) : Source [ E , NotUsed ] =
2016-01-11 17:15:44 +01:00
Source . fromGraph ( new Unfold ( s , f ) )
2015-12-10 12:49:59 +02:00
/* *
2015-12-14 13:28:21 +02:00
* Same as [ [ unfold ] ] , but uses an async function to generate the next state - element tuple .
2015-12-10 12:49:59 +02:00
*
* async fibonacci example :
*
* { { {
2019-02-09 15:25:39 +01:00
* Source . unfoldAsync ( 0 -> 1 ) {
* case ( a , _ ) if a > 10000000 => Future . successful ( None )
* case ( a , b ) => Future {
2015-12-10 12:49:59 +02:00
* Thread . sleep ( 1000 )
2019-02-09 15:25:39 +01:00
* Some ( ( b -> ( a + b ) ) -> a )
2015-12-10 12:49:59 +02:00
* }
* }
* } } }
*/
2019-02-09 15:25:39 +01:00
def unfoldAsync [ S , E ] ( s : S ) ( f : S => Future [ Option [ ( S , E ) ] ] ) : Source [ E , NotUsed ] =
2016-01-11 17:15:44 +01:00
Source . fromGraph ( new UnfoldAsync ( s , f ) )
2015-12-10 12:49:59 +02:00
2014-10-27 09:48:54 +02:00
/* *
2014-10-30 17:04:36 +01:00
* A `Source` with no elements , i . e . an empty stream that is completed immediately for every connected `Sink` .
2014-10-27 09:48:54 +02:00
*/
2016-01-20 10:00:37 +02:00
def empty [ T ] : Source [ T , NotUsed ] = _empty
private [ this ] val _empty : Source [ Nothing , NotUsed ] =
2017-03-03 15:20:37 +01:00
Source . fromGraph ( EmptySource )
2014-10-27 09:48:54 +02:00
2015-01-29 10:21:54 +01:00
/* *
2015-10-21 22:45:39 +02:00
* Create a `Source` which materializes a [ [ scala . concurrent . Promise ] ] which controls what element
* will be emitted by the Source .
* If the materialized promise is completed with a Some , that value will be produced downstream ,
* followed by completion .
* If the materialized promise is completed with a None , no value will be produced downstream and completion will
* be signalled immediately .
* If the materialized promise is completed with a failure , then the returned source will terminate with that error .
* If the downstream of this source cancels before the promise has been completed , then the promise will be completed
* with None .
2015-01-29 10:21:54 +01:00
*/
2015-10-21 22:45:39 +02:00
def maybe [ T ] : Source [ T , Promise [ Option [ T ] ] ] =
2017-07-05 12:55:28 +01:00
Source . fromGraph ( MaybeSource . asInstanceOf [ Graph [ SourceShape [ T ] , Promise [ Option [ T ] ] ] ] )
2015-01-28 14:19:50 +01:00
/* *
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink` .
*/
2016-01-20 10:00:37 +02:00
def failed [ T ] ( cause : Throwable ) : Source [ T , NotUsed ] =
2017-07-05 12:55:28 +01:00
Source . fromGraph ( new FailedSource [ T ] ( cause ) )
2014-10-27 09:48:54 +02:00
2016-11-25 16:25:26 +01:00
/* *
* Creates a `Source` that is not materialized until there is downstream demand , when the source gets materialized
* the materialized future is completed with its value , if downstream cancels or fails without any demand the
* create factory is never called and the materialized `Future` is failed .
*/
2019-02-09 15:25:39 +01:00
def lazily [ T , M ] ( create : ( ) => Source [ T , M ] ) : Source [ T , Future [ M ] ] =
2016-11-25 16:25:26 +01:00
Source . fromGraph ( new LazySource [ T , M ] ( create ) )
2018-02-21 19:15:25 -06:00
/* *
* Creates a `Source` from supplied future factory that is not called until downstream demand . When source gets
* materialized the materialized future is completed with the value from the factory . If downstream cancels or fails
* without any demand the create factory is never called and the materialized `Future` is failed .
*
* @see [ [ Source . lazily ] ]
*/
2019-02-09 15:25:39 +01:00
def lazilyAsync [ T ] ( create : ( ) => Future [ T ] ) : Source [ T , Future [ NotUsed ] ] =
lazily ( ( ) => fromFuture ( create ( ) ) )
2018-02-21 19:15:25 -06:00
2014-10-17 14:05:50 +02:00
/* *
* Creates a `Source` that is materialized as a [ [ org . reactivestreams . Subscriber ] ]
*/
2015-12-17 11:48:30 +02:00
def asSubscriber [ T ] : Source [ T , Subscriber [ T ] ] =
2016-07-27 13:29:23 +02:00
fromGraph ( new SubscriberSource [ T ] ( DefaultAttributes . subscriberSource , shape ( "SubscriberSource" ) ) )
2015-01-28 14:19:50 +01:00
2015-03-31 15:13:57 +02:00
/* *
* Creates a `Source` that is materialized to an [ [ akka . actor . ActorRef ] ] which points to an Actor
2015-10-09 15:11:01 -04:00
* created according to the passed in [ [ akka . actor . Props ] ] . Actor created by the `props` must
2015-03-31 15:13:57 +02:00
* be [ [ akka . stream . actor . ActorPublisher ] ] .
2016-12-09 14:08:13 +01:00
*
2016-12-08 17:22:01 +01:00
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead , it allows for all operations an Actor would and is more type - safe as well as guaranteed to be ReactiveStreams compliant .
2015-03-31 15:13:57 +02:00
*/
2019-03-11 10:38:24 +01:00
@deprecated (
"Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant." ,
since = "2.5.0" )
2015-10-09 15:11:01 -04:00
def actorPublisher [ T ] ( props : Props ) : Source [ T , ActorRef ] = {
require ( classOf [ ActorPublisher [ _ ] ] . isAssignableFrom ( props . actorClass ( ) ) , "Actor must be ActorPublisher" )
2016-07-27 13:29:23 +02:00
fromGraph ( new ActorPublisherSource ( props , DefaultAttributes . actorPublisherSource , shape ( "ActorPublisherSource" ) ) )
2015-10-09 15:11:01 -04:00
}
2015-03-31 15:13:57 +02:00
2017-09-07 21:07:41 +02:00
/* *
2018-01-19 19:22:40 +07:00
* INTERNAL API
*
2017-09-07 21:07:41 +02:00
* Creates a `Source` that is materialized as an [ [ akka . actor . ActorRef ] ] .
* Messages sent to this actor will be emitted to the stream if there is demand from downstream ,
* otherwise they will be buffered until request for demand is received .
*
* Depending on the defined [ [ akka . stream . OverflowStrategy ] ] it might drop elements if
* there is no space available in the buffer .
*
* The strategy [ [ akka . stream . OverflowStrategy . backpressure ] ] is not supported , and an
* IllegalArgument ( "Backpressure overflowStrategy not supported" ) will be thrown if it is passed as argument .
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream . When `bufferSize` is 0 the `overflowStrategy` does not matter . An async boundary is added after
* this Source ; as such , it is never safe to assume the downstream will always generate demand .
*
* The stream can be completed successfully by sending the actor reference a message that is matched by
* `completionMatcher` in which case already buffered elements will be signaled before signaling
2018-07-01 11:18:34 +01:00
* completion .
2017-09-07 21:07:41 +02:00
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher` . The extracted
* [ [ Throwable ] ] will be used to fail the stream . In case the Actor is still draining its internal buffer ( after having received
* a message matched by `completionMatcher` ) before signaling completion and it receives a message matched by `failureMatcher` ,
* the failure will be signaled downstream immediately ( instead of the completion signal ) .
*
2018-07-01 11:18:34 +01:00
* Note that terminating the actor without first completing it , either with a success or a
* failure , will prevent the actor triggering downstream completion and the stream will continue
* to run even though the source actor is dead . Therefore you should ** not ** attempt to
* manually terminate the actor such as with a [ [ akka . actor . PoisonPill ] ] .
*
2017-09-07 21:07:41 +02:00
* The actor will be stopped when the stream is completed , failed or canceled from downstream ,
* i . e . you can watch it to get notified when that happens .
*
* See also [ [ akka . stream . scaladsl . Source . queue ] ] .
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
2019-03-13 10:56:20 +01:00
@InternalApi private [ akka ] def actorRef [ T ] (
2019-03-27 14:16:38 +01:00
completionMatcher : PartialFunction [ Any , CompletionStrategy ] ,
2019-03-13 10:56:20 +01:00
failureMatcher : PartialFunction [ Any , Throwable ] ,
bufferSize : Int ,
overflowStrategy : OverflowStrategy ) : Source [ T , ActorRef ] = {
2017-09-07 21:07:41 +02:00
require ( bufferSize >= 0 , "bufferSize must be greater than or equal to 0" )
2018-12-05 14:31:43 +01:00
require ( ! overflowStrategy . isBackpressure , "Backpressure overflowStrategy not supported" )
2019-03-27 14:16:38 +01:00
Source
. fromGraph ( new ActorRefSource ( bufferSize , overflowStrategy , completionMatcher , failureMatcher ) )
. withAttributes ( DefaultAttributes . actorRefSource )
2017-09-07 21:07:41 +02:00
}
2015-03-31 15:13:57 +02:00
/* *
* Creates a `Source` that is materialized as an [ [ akka . actor . ActorRef ] ] .
* Messages sent to this actor will be emitted to the stream if there is demand from downstream ,
* otherwise they will be buffered until request for demand is received .
*
* Depending on the defined [ [ akka . stream . OverflowStrategy ] ] it might drop elements if
* there is no space available in the buffer .
*
2015-07-09 10:18:18 +02:00
* The strategy [ [ akka . stream . OverflowStrategy . backpressure ] ] is not supported , and an
* IllegalArgument ( "Backpressure overflowStrategy not supported" ) will be thrown if it is passed as argument .
*
2016-05-01 19:58:49 -06:00
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream . When `bufferSize` is 0 the `overflowStrategy` does not matter . An async boundary is added after
* this Source ; as such , it is never safe to assume the downstream will always generate demand .
2015-03-31 15:13:57 +02:00
*
2019-03-27 14:16:38 +01:00
* The stream can be completed successfully by sending the actor reference a [ [ akka . actor . Status . Success ] ] .
* If the content is [ [ akka . stream . CompletionStrategy . immediately ] ] the completion will be signaled immidiately ,
* otherwise if the content is [ [ akka . stream . CompletionStrategy . draining ] ] ( or anything else )
* already buffered elements will be signaled before siganling completion .
* Sending [ [ akka . actor . PoisonPill ] ] will signal completion immediately but this behavior is deprecated and scheduled to be removed .
2015-03-31 15:13:57 +02:00
*
2016-04-15 16:29:53 +02:00
* The stream can be completed with failure by sending a [ [ akka . actor . Status . Failure ] ] to the
2015-04-28 09:48:46 +02:00
* actor reference . In case the Actor is still draining its internal buffer ( after having received
2016-04-15 16:29:53 +02:00
* a [ [ akka . actor . Status . Success ] ] ) before signaling completion and it receives a [ [ akka . actor . Status . Failure ] ] ,
2015-09-28 22:23:59 -07:00
* the failure will be signaled downstream immediately ( instead of the completion signal ) .
2015-03-31 15:13:57 +02:00
*
2015-09-28 22:23:59 -07:00
* The actor will be stopped when the stream is completed , failed or canceled from downstream ,
2015-03-31 15:13:57 +02:00
* i . e . you can watch it to get notified when that happens .
*
2016-10-26 10:24:51 +02:00
* See also [ [ akka . stream . scaladsl . Source . queue ] ] .
2016-05-01 19:58:49 -06:00
*
2017-09-07 21:07:41 +02:00
*
2015-03-31 15:13:57 +02:00
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
2017-09-07 21:07:41 +02:00
def actorRef [ T ] ( bufferSize : Int , overflowStrategy : OverflowStrategy ) : Source [ T , ActorRef ] =
2019-03-11 10:38:24 +01:00
actorRef ( {
2019-03-27 14:16:38 +01:00
case akka . actor . Status . Success ( s : CompletionStrategy ) => s
case akka . actor . Status . Success ( _ ) => CompletionStrategy . Draining
case akka . actor . Status . Success => CompletionStrategy . Draining
} , { case akka . actor . Status . Failure ( cause ) => cause } , bufferSize , overflowStrategy )
2015-03-31 15:13:57 +02:00
2015-06-29 23:47:31 -04:00
/* *
2017-09-18 09:48:57 +03:00
* Combines several sources with fan - in strategy like `Merge` or `Concat` and returns `Source` .
2015-06-29 23:47:31 -04:00
*/
2019-03-11 10:38:24 +01:00
def combine [ T , U ] ( first : Source [ T , _ ] , second : Source [ T , _ ] , rest : Source [ T , _ ] * ) (
strategy : Int => Graph [ UniformFanInShape [ T , U ] , NotUsed ] ) : Source [ U , NotUsed ] =
2019-02-09 15:25:39 +01:00
Source . fromGraph ( GraphDSL . create ( ) { implicit b =>
2015-11-30 15:45:37 +01:00
import GraphDSL.Implicits._
2015-06-29 23:47:31 -04:00
val c = b . add ( strategy ( rest . size + 2 ) )
first ~> c . in ( 0 )
second ~> c . in ( 1 )
@tailrec def combineRest ( idx : Int , i : Iterator [ Source [ T , _ ] ] ) : SourceShape [ U ] =
if ( i . hasNext ) {
i . next ( ) ~> c . in ( idx )
combineRest ( idx + 1 , i )
} else SourceShape ( c . out )
combineRest ( 2 , rest . iterator )
} )
2017-11-02 10:34:40 +09:00
/* *
* Combines two sources with fan - in strategy like `Merge` or `Concat` and returns `Source` with a materialized value .
*/
2019-03-11 10:38:24 +01:00
def combineMat [ T , U , M1 , M2 , M ] ( first : Source [ T , M1 ] , second : Source [ T , M2 ] ) (
strategy : Int => Graph [ UniformFanInShape [ T , U ] , NotUsed ] ) ( matF : ( M1 , M2 ) => M ) : Source [ U , M ] = {
2019-02-09 15:25:39 +01:00
val secondPartiallyCombined = GraphDSL . create ( second ) { implicit b => secondShape =>
2017-11-02 10:34:40 +09:00
import GraphDSL.Implicits._
val c = b . add ( strategy ( 2 ) )
secondShape ~> c . in ( 1 )
FlowShape ( c . in ( 0 ) , c . out )
}
first . viaMat ( secondPartiallyCombined ) ( matF )
}
2016-04-22 12:04:28 +02:00
/* *
* Combine the elements of multiple streams into a stream of sequences .
*/
2019-03-11 10:38:24 +01:00
def zipN [ T ] ( sources : immutable.Seq [ Source [ T , _ ] ] ) : Source [ immutable . Seq [ T ] , NotUsed ] =
zipWithN ( ConstantFun . scalaIdentityFunction [ immutable . Seq [ T ] ] ) ( sources ) . addAttributes ( DefaultAttributes . zipN )
2016-04-22 12:04:28 +02:00
/*
* Combine the elements of multiple streams into a stream of sequences using a combiner function .
*/
2019-02-09 15:25:39 +01:00
def zipWithN [ T , O ] ( zipper : immutable.Seq [ T ] => O ) ( sources : immutable.Seq [ Source [ T , _ ] ] ) : Source [ O , NotUsed ] = {
2016-04-22 12:04:28 +02:00
val source = sources match {
2019-02-09 15:25:39 +01:00
case immutable . Seq ( ) => empty [ O ]
case immutable . Seq ( source ) => source . map ( t => zipper ( immutable . Seq ( t ) ) ) . mapMaterializedValue ( _ => NotUsed )
case s1 +: s2 +: ss => combine ( s1 , s2 , ss : _ * ) ( ZipWithN ( zipper ) )
2016-04-22 12:04:28 +02:00
}
source . addAttributes ( DefaultAttributes . zipWithN )
}
2015-08-19 23:04:20 -04:00
/* *
2018-05-14 12:11:20 +01:00
* Creates a `Source` that is materialized as an [ [ akka . stream . scaladsl . SourceQueueWithComplete ] ] .
2015-08-19 23:04:20 -04:00
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream ,
2016-01-16 12:17:19 -05:00
* otherwise they will be buffered until request for demand is received . Elements in the buffer will be discarded
* if downstream is terminated .
2015-08-19 23:04:20 -04:00
*
* Depending on the defined [ [ akka . stream . OverflowStrategy ] ] it might drop elements if
* there is no space available in the buffer .
*
* Acknowledgement mechanism is available .
2018-05-14 12:11:20 +01:00
* [ [ akka . stream . scaladsl . SourceQueueWithComplete . offer ] ] returns `Future[QueueOfferResult]` which completes with
2016-10-26 10:24:51 +02:00
* `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream . It completes with
* `QueueOfferResult.Dropped` if element was dropped . Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed .
2015-08-19 23:04:20 -04:00
*
2016-01-16 12:17:19 -05:00
* The strategy [ [ akka . stream . OverflowStrategy . backpressure ] ] will not complete last `offer():Future`
* call when buffer is full .
2015-08-19 23:04:20 -04:00
*
2018-05-14 12:11:20 +01:00
* You can watch accessibility of stream with [ [ akka . stream . scaladsl . SourceQueueWithComplete . watchCompletion ] ] .
2019-01-13 18:23:15 +01:00
* It returns future that completes with success when the operator is completed or fails when the stream is failed .
2015-08-19 23:04:20 -04:00
*
2016-11-22 16:11:09 +03:00
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
* for downstream demand unless there is another message waiting for downstream demand , in that case
* offer result will be completed according to the overflow strategy .
2016-01-16 12:17:19 -05:00
*
* @param bufferSize size of buffer in element count
2015-08-19 23:04:20 -04:00
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
2016-02-25 16:05:35 +01:00
def queue [ T ] ( bufferSize : Int , overflowStrategy : OverflowStrategy ) : Source [ T , SourceQueueWithComplete [ T ] ] =
2016-01-16 12:17:19 -05:00
Source . fromGraph ( new QueueSource ( bufferSize , overflowStrategy ) . withAttributes ( DefaultAttributes . queueSource ) )
2015-08-19 23:04:20 -04:00
2016-02-22 23:22:47 -05:00
/* *
* Start a new `Source` from some resource which can be opened , read and closed .
* Interaction with resource happens in a blocking way .
*
* Example :
* { { {
* Source . unfoldResource (
* ( ) => new BufferedReader ( new FileReader ( "..." ) ) ,
* reader => Option ( reader . readLine ( ) ) ,
* reader => reader . close ( ) )
* } } }
*
* You can use the supervision strategy to handle exceptions for `read` function . All exceptions thrown by `create`
* or `close` will fail the stream .
*
* `Restart` supervision strategy will close and create blocking IO again . Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function by default .
*
2018-03-07 15:12:34 +01:00
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
2016-02-22 23:22:47 -05:00
* set it for a given Source by using [ [ ActorAttributes ] ] .
*
2017-07-26 16:23:46 +02:00
* Adheres to the [ [ ActorAttributes . SupervisionStrategy ] ] attribute .
*
2016-02-22 23:22:47 -05:00
* @param create - function that is called on stream start and creates / opens resource .
* @param read - function that reads data from opened resource . It is called each time backpressure signal
* is received . Stream calls close and completes when `read` returns None .
* @param close - function that closes resource
*/
2019-02-09 15:25:39 +01:00
def unfoldResource [ T , S ] ( create : ( ) => S , read : ( S ) => Option [ T ] , close : ( S ) => Unit ) : Source [ T , NotUsed ] =
2016-02-22 23:22:47 -05:00
Source . fromGraph ( new UnfoldResourceSource ( create , read , close ) )
/* *
* Start a new `Source` from some resource which can be opened , read and closed .
* It 's similar to `unfoldResource` but takes functions that return `Futures` instead of plain values .
*
* You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures` .
* All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream .
*
* `Restart` supervision strategy will close and create resource . Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function ( or future ) by default .
*
2018-03-07 15:12:34 +01:00
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
2016-02-22 23:22:47 -05:00
* set it for a given Source by using [ [ ActorAttributes ] ] .
*
2017-07-26 16:23:46 +02:00
* Adheres to the [ [ ActorAttributes . SupervisionStrategy ] ] attribute .
*
2016-02-22 23:22:47 -05:00
* @param create - function that is called on stream start and creates / opens resource .
* @param read - function that reads data from opened resource . It is called each time backpressure signal
* is received . Stream calls close and completes when `Future` from read function returns None .
* @param close - function that closes resource
*/
2019-03-13 10:56:20 +01:00
def unfoldResourceAsync [ T , S ] (
create : ( ) => Future [ S ] ,
read : ( S ) => Future [ Option [ T ] ] ,
close : ( S ) => Future [ Done ] ) : Source [ T , NotUsed ] =
2016-02-22 23:22:47 -05:00
Source . fromGraph ( new UnfoldResourceSourceAsync ( create , read , close ) )
2018-01-04 17:21:47 +01:00
2015-01-28 14:19:50 +01:00
}