2014-09-03 21:54:18 +02:00
/* *
2018-01-04 17:26:29 +00:00
* Copyright ( C ) 2014 - 2018 Lightbend Inc . < https : //www.lightbend.com>
2014-09-03 21:54:18 +02:00
*/
2014-10-27 14:35:41 +01:00
package akka.stream.scaladsl
2014-09-03 21:54:18 +02:00
2016-01-20 10:00:37 +02:00
import akka. { Done , NotUsed }
2015-11-10 15:15:59 +01:00
import akka.dispatch.ExecutionContexts
2016-08-09 21:08:31 -05:00
import akka.actor. { ActorRef , Props , Status }
2015-10-09 15:11:01 -04:00
import akka.stream.actor.ActorSubscriber
2015-08-19 23:04:20 -04:00
import akka.stream.impl.Stages.DefaultAttributes
2015-01-28 14:19:50 +01:00
import akka.stream.impl._
2016-10-26 15:38:49 +05:00
import akka.stream.impl.fusing.GraphStages
2017-11-23 10:26:00 +01:00
import akka.stream.stage._
2015-08-19 23:04:20 -04:00
import akka.stream. { javadsl , _ }
2015-01-28 14:19:50 +01:00
import org.reactivestreams. { Publisher , Subscriber }
2016-08-09 21:08:31 -05:00
2015-06-29 23:47:31 -04:00
import scala.annotation.tailrec
2017-11-29 13:49:31 -03:00
import scala.collection.generic.CanBuildFrom
2016-02-12 01:36:21 +08:00
import scala.collection.immutable
2016-08-09 21:08:31 -05:00
import scala.concurrent. { ExecutionContext , Future }
2015-04-16 02:24:01 +02:00
import scala.util. { Failure , Success , Try }
2014-09-03 21:54:18 +02:00
/* *
2016-08-11 07:37:54 -05:00
* A `Sink` is a set of stream processing steps that has one open input .
2014-10-02 17:32:08 +02:00
* Can be used as a `Subscriber`
2014-09-03 21:54:18 +02:00
*/
2016-07-27 13:29:23 +02:00
final class Sink [ - In , + Mat ] (
override val traversalBuilder : LinearTraversalBuilder ,
2017-03-08 14:22:19 +01:00
override val shape : SinkShape [ In ] )
2015-01-28 14:19:50 +01:00
extends Graph [ SinkShape [ In ] , Mat ] {
2016-07-27 13:29:23 +02:00
// TODO: Debug string
override def toString : String = s" Sink( $shape ) "
2016-03-11 17:08:30 +01:00
2016-02-10 12:18:24 +01:00
/* *
* Transform this Sink by applying a function to each * incoming * upstream element before
* it is passed to the [ [ Sink ] ]
*
* ''' Backpressures when ''' original [ [ Sink ] ] backpressures
*
* ''' Cancels when ''' original [ [ Sink ] ] backpressures
*/
def contramap [ In2 ] ( f : In2 ⇒ In ) : Sink [ In2 , Mat ] = Flow . fromFunction ( f ) . toMat ( this ) ( Keep . right )
2014-10-02 13:34:27 +02:00
/* *
2014-10-17 14:05:50 +02:00
* Connect this `Sink` to a `Source` and run it . The returned value is the materialized value
2015-03-06 10:23:26 +01:00
* of the `Source` , e . g . the `Subscriber` of a [ [ Source # subscriber ] ] .
2014-10-02 13:34:27 +02:00
*/
2015-06-23 18:28:53 +02:00
def runWith [ Mat2 ] ( source : Graph [ SourceShape [ In ] , Mat2 ] ) ( implicit materializer : Materializer ) : Mat2 =
2015-10-21 22:45:39 +02:00
Source . fromGraph ( source ) . to ( this ) . run ( )
2015-01-28 14:19:50 +01:00
2016-08-11 07:37:54 -05:00
/* *
* Transform only the materialized value of this Sink , leaving all other properties as they were .
*/
2015-05-05 10:29:41 +02:00
def mapMaterializedValue [ Mat2 ] ( f : Mat ⇒ Mat2 ) : Sink [ In , Mat2 ] =
2016-07-27 13:29:23 +02:00
new Sink (
traversalBuilder . transformMat ( f . asInstanceOf [ Any ⇒ Any ] ) ,
2017-03-08 14:22:19 +01:00
shape )
2014-10-10 10:39:29 +02:00
2015-12-22 20:56:02 +01:00
/* *
2017-11-23 10:26:00 +01:00
* Replace the attributes of this [ [ Sink ] ] with the given ones . If this Sink is a composite
* of multiple graphs , new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite .
2015-12-22 20:56:02 +01:00
*/
2015-06-23 17:32:55 +02:00
override def withAttributes ( attr : Attributes ) : Sink [ In , Mat ] =
2016-07-27 13:29:23 +02:00
new Sink (
traversalBuilder . setAttributes ( attr ) ,
2017-03-08 14:22:19 +01:00
shape )
2015-03-05 12:21:17 +01:00
2015-12-22 20:56:02 +01:00
/* *
2017-11-23 10:26:00 +01:00
* Add the given attributes to this [ [ Sink ] ] . If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one .
* If this Sink is a composite of multiple graphs , new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite .
2015-12-22 20:56:02 +01:00
*/
override def addAttributes ( attr : Attributes ) : Sink [ In , Mat ] =
2016-07-27 13:29:23 +02:00
withAttributes ( traversalBuilder . attributes and attr )
2015-12-22 20:56:02 +01:00
/* *
2016-08-11 07:37:54 -05:00
* Add a ` `name` ` attribute to this Sink .
2015-12-22 20:56:02 +01:00
*/
2016-02-10 13:56:38 +01:00
override def named ( name : String ) : Sink [ In , Mat ] = addAttributes ( Attributes . name ( name ) )
/* *
2017-11-23 10:26:00 +01:00
* Put an asynchronous boundary around this `Source`
2016-02-10 13:56:38 +01:00
*/
2017-11-23 10:26:00 +01:00
override def async : Sink [ In , Mat ] = super . async . asInstanceOf [ Sink [ In , Mat ] ]
/* *
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async ( dispatcher : String ) : Sink [ In , Mat ] =
super . async ( dispatcher ) . asInstanceOf [ Sink [ In , Mat ] ]
/* *
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async ( dispatcher : String , inputBufferSize : Int ) : Sink [ In , Mat ] =
super . async ( dispatcher , inputBufferSize ) . asInstanceOf [ Sink [ In , Mat ] ]
2015-03-06 12:22:14 +01:00
2016-08-11 07:37:54 -05:00
/* *
* Converts this Scala DSL element to it 's Java DSL counterpart .
*/
2015-03-06 12:22:14 +01:00
def asJava : javadsl.Sink [ In , Mat ] = new javadsl . Sink ( this )
2014-09-03 21:54:18 +02:00
}
2014-10-08 15:15:46 +02:00
2015-10-21 22:45:39 +02:00
object Sink {
2015-01-28 14:19:50 +01:00
2015-04-16 02:24:01 +02:00
/* * INTERNAL API */
2016-05-03 18:58:26 -07:00
def shape [ T ] ( name : String ) : SinkShape [ T ] = SinkShape ( Inlet ( name + ".in" ) )
2015-01-28 14:19:50 +01:00
2014-10-08 15:15:46 +02:00
/* *
2015-03-04 15:22:33 +01:00
* A graph with the shape of a sink logically is a sink , this method makes
2015-01-28 14:19:50 +01:00
* it so also in type .
2014-10-08 15:15:46 +02:00
*/
2015-10-21 22:45:39 +02:00
def fromGraph [ T , M ] ( g : Graph [ SinkShape [ T ] , M ] ) : Sink [ T , M ] =
2015-06-06 17:17:23 +02:00
g match {
2015-10-21 22:45:39 +02:00
case s : Sink [ T , M ] ⇒ s
case s : javadsl.Sink [ T , M ] ⇒ s . asScala
2017-11-23 10:26:00 +01:00
case g : GraphStageWithMaterializedValue [ SinkShape [ T ] , M ] ⇒
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g . traversalBuilder . attributes
val noAttrStage = g . withAttributes ( Attributes . none )
new Sink (
LinearTraversalBuilder . fromBuilder ( noAttrStage . traversalBuilder , noAttrStage . shape , Keep . right ) ,
noAttrStage . shape
) . withAttributes ( attrs )
2016-07-27 13:29:23 +02:00
case other ⇒ new Sink (
LinearTraversalBuilder . fromBuilder ( other . traversalBuilder , other . shape , Keep . right ) ,
2017-03-08 14:22:19 +01:00
other . shape )
2015-06-06 17:17:23 +02:00
}
2014-10-10 10:39:29 +02:00
/* *
2015-01-28 14:19:50 +01:00
* Helper to create [ [ Sink ] ] from `Subscriber` .
2014-10-10 10:39:29 +02:00
*/
2016-01-20 10:00:37 +02:00
def fromSubscriber [ T ] ( subscriber : Subscriber [ T ] ) : Sink [ T , NotUsed ] =
2016-07-27 13:29:23 +02:00
fromGraph ( new SubscriberSink ( subscriber , DefaultAttributes . subscriberSink , shape ( "SubscriberSink" ) ) )
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that immediately cancels its upstream after materialization .
*/
2016-01-20 10:00:37 +02:00
def cancelled [ T ] : Sink [ T , NotUsed ] =
2016-07-27 13:29:23 +02:00
fromGraph [ Any , NotUsed ] ( new CancelSink ( DefaultAttributes . cancelledSink , shape ( "CancelledSink" ) ) )
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that materializes into a `Future` of the first value received .
2015-11-10 15:15:59 +01:00
* If the stream completes before signaling at least a single element , the Future will be failed with a [ [ NoSuchElementException ] ] .
* If the stream signals an error errors before signaling at least a single element , the Future will be failed with the streams exception .
*
* See also [ [ headOption ] ] .
*/
2015-11-18 00:09:04 +01:00
def head [ T ] : Sink [ T , Future [ T ] ] =
Sink . fromGraph ( new HeadOptionStage [ T ] ) . withAttributes ( DefaultAttributes . headSink )
. mapMaterializedValue ( e ⇒ e . map ( _ . getOrElse ( throw new NoSuchElementException ( "head of empty stream" ) ) ) ( ExecutionContexts . sameThreadExecutionContext ) )
2015-11-10 15:15:59 +01:00
/* *
* A `Sink` that materializes into a `Future` of the optional first value received .
* If the stream completes before signaling at least a single element , the value of the Future will be [ [ None ] ] .
* If the stream signals an error errors before signaling at least a single element , the Future will be failed with the streams exception .
*
* See also [ [ head ] ] .
2014-10-17 14:05:50 +02:00
*/
2015-11-18 00:09:04 +01:00
def headOption [ T ] : Sink [ T , Future [ Option [ T ] ] ] =
Sink . fromGraph ( new HeadOptionStage [ T ] ) . withAttributes ( DefaultAttributes . headOptionSink )
/* *
* A `Sink` that materializes into a `Future` of the last value received .
* If the stream completes before signaling at least a single element , the Future will be failed with a [ [ NoSuchElementException ] ] .
2017-04-27 17:31:33 +02:00
* If the stream signals an error , the Future will be failed with the stream 's exception .
2015-11-18 00:09:04 +01:00
*
* See also [ [ lastOption ] ] .
*/
def last [ T ] : Sink [ T , Future [ T ] ] = Sink . fromGraph ( new LastOptionStage [ T ] ) . withAttributes ( DefaultAttributes . lastSink )
. mapMaterializedValue ( e ⇒ e . map ( _ . getOrElse ( throw new NoSuchElementException ( "last of empty stream" ) ) ) ( ExecutionContexts . sameThreadExecutionContext ) )
/* *
* A `Sink` that materializes into a `Future` of the optional last value received .
* If the stream completes before signaling at least a single element , the value of the Future will be [ [ None ] ] .
2017-04-27 17:31:33 +02:00
* If the stream signals an error , the Future will be failed with the stream 's exception .
2015-11-18 00:09:04 +01:00
*
* See also [ [ last ] ] .
*/
def lastOption [ T ] : Sink [ T , Future [ Option [ T ] ] ] = Sink . fromGraph ( new LastOptionStage [ T ] ) . withAttributes ( DefaultAttributes . lastOptionSink )
2014-10-17 14:05:50 +02:00
2015-11-19 00:11:07 +08:00
/* *
* A `Sink` that keeps on collecting incoming elements until upstream terminates .
2015-12-22 19:49:09 +01:00
* As upstream may be unbounded , `Flow[T].take` or the stricter `Flow[T].limit` ( and their variants )
2015-11-19 00:11:07 +08:00
* may be used to ensure boundedness .
2015-12-22 19:49:09 +01:00
* Materializes into a `Future` of `Seq[T]` containing all the collected elements .
2016-01-17 15:04:45 +01:00
* `Seq` is limited to `Int.MaxValue` elements , this Sink will cancel the stream
* after having received that many elements .
2015-11-19 00:11:07 +08:00
*
* See also [ [ Flow . limit ] ] , [ [ Flow . limitWeighted ] ] , [ [ Flow . take ] ] , [ [ Flow . takeWithin ] ] , [ [ Flow . takeWhile ] ]
*/
2017-11-29 13:49:31 -03:00
def seq [ T ] : Sink [ T , Future [ immutable . Seq [ T ] ] ] = Sink . fromGraph ( new SeqStage [ T , Vector [ T ] ] )
/* *
* A `Sink` that keeps on collecting incoming elements until upstream terminates .
* As upstream may be unbounded , `Flow[T].take` or the stricter `Flow[T].limit` ( and their variants )
* may be used to ensure boundedness .
* Materializes into a `Future` of `That[T]` containing all the collected elements .
* `That[T]` is limited to the limitations of the CanBuildFrom associated with it . For example , `Seq` is limited to
* `Int.MaxValue` elements . See [ The Architecture of Scala Collectionss ] ( https : //docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info.
* This Sink will cancel the stream after having received that many elements .
*
* See also [ [ Flow . limit ] ] , [ [ Flow . limitWeighted ] ] , [ [ Flow . take ] ] , [ [ Flow . takeWithin ] ] , [ [ Flow . takeWhile ] ]
*/
def collection [ T , That ] ( implicit cbf : CanBuildFrom [ Nothing , T , That with immutable . Traversable [ _ ] ] ) : Sink [ T , Future [ That ] ] =
Sink . fromGraph ( new SeqStage [ T , That ] )
2015-11-19 00:11:07 +08:00
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that materializes into a [ [ org . reactivestreams . Publisher ] ] .
2015-10-30 16:00:44 +01:00
*
2015-11-03 12:53:24 +01:00
* If `fanout` is `true` , the materialized `Publisher` will support multiple `Subscriber` s and
* the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that
* the fastest [ [ org . reactivestreams . Subscriber ] ] can be ahead of the slowest one before slowing
* the processing down due to back pressure .
*
* If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber` s .
2014-10-17 14:05:50 +02:00
*/
2015-12-17 11:48:30 +02:00
def asPublisher [ T ] ( fanout : Boolean ) : Sink [ T , Publisher [ T ] ] =
2016-07-27 13:29:23 +02:00
fromGraph (
2015-11-03 12:53:24 +01:00
if ( fanout ) new FanoutPublisherSink [ T ] ( DefaultAttributes . fanoutPublisherSink , shape ( "FanoutPublisherSink" ) )
else new PublisherSink [ T ] ( DefaultAttributes . publisherSink , shape ( "PublisherSink" ) ) )
2015-01-28 14:19:50 +01:00
/* *
* A `Sink` that will consume the stream and discard the elements .
*/
2016-10-26 15:38:49 +05:00
def ignore : Sink [ Any , Future [ Done ] ] = fromGraph ( GraphStages . IgnoreSink )
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that will invoke the given procedure for each received element . The sink is materialized
* into a [ [ scala . concurrent . Future ] ] will be completed with `Success` when reaching the
2015-01-30 10:30:56 +01:00
* normal end of the stream , or completed with `Failure` if there is a failure signaled in
2014-10-17 14:05:50 +02:00
* the stream . .
*/
2016-01-20 10:00:37 +02:00
def foreach [ T ] ( f : T ⇒ Unit ) : Sink [ T , Future [ Done ] ] =
2015-06-14 03:12:30 -04:00
Flow [ T ] . map ( f ) . toMat ( Sink . ignore ) ( Keep . right ) . named ( "foreachSink" )
2014-10-17 14:05:50 +02:00
2015-06-29 23:47:31 -04:00
/* *
2016-02-21 13:03:00 +02:00
* Combine several sinks with fan - out strategy like `Broadcast` or `Balance` and returns `Sink` .
2015-06-29 23:47:31 -04:00
*/
2016-01-20 10:00:37 +02:00
def combine [ T , U ] ( first : Sink [ U , _ ] , second : Sink [ U , _ ] , rest : Sink [ U , _ ] * ) ( strategy : Int ⇒ Graph [ UniformFanOutShape [ T , U ] , NotUsed ] ) : Sink [ T , NotUsed ] =
2015-06-29 23:47:31 -04:00
2015-11-30 15:45:37 +01:00
Sink . fromGraph ( GraphDSL . create ( ) { implicit b ⇒
import GraphDSL.Implicits._
2015-06-29 23:47:31 -04:00
val d = b . add ( strategy ( rest . size + 2 ) )
d . out ( 0 ) ~> first
d . out ( 1 ) ~> second
@tailrec def combineRest ( idx : Int , i : Iterator [ Sink [ U , _ ] ] ) : SinkShape [ T ] =
if ( i . hasNext ) {
d . out ( idx ) ~> i . next ( )
combineRest ( idx + 1 , i )
} else new SinkShape ( d . in )
combineRest ( 2 , rest . iterator )
} )
2015-06-09 00:05:56 -04:00
/* *
* A `Sink` that will invoke the given function to each of the elements
* as they pass in . The sink is materialized into a [ [ scala . concurrent . Future ] ]
*
* If `f` throws an exception and the supervision decision is
* [ [ akka . stream . Supervision . Stop ] ] the `Future` will be completed with failure .
*
* If `f` throws an exception and the supervision decision is
* [ [ akka . stream . Supervision . Resume ] ] or [ [ akka . stream . Supervision . Restart ] ] the
* element is dropped and the stream continues .
*
2016-08-11 07:37:54 -05:00
* See also [ [ Flow . mapAsyncUnordered ] ]
2015-06-09 00:05:56 -04:00
*/
2016-01-20 10:00:37 +02:00
def foreachParallel [ T ] ( parallelism : Int ) ( f : T ⇒ Unit ) ( implicit ec : ExecutionContext ) : Sink [ T , Future [ Done ] ] =
2015-06-14 03:12:30 -04:00
Flow [ T ] . mapAsyncUnordered ( parallelism ) ( t ⇒ Future ( f ( t ) ) ) . toMat ( Sink . ignore ) ( Keep . right )
2015-06-09 00:05:56 -04:00
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that will invoke the given function for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
2015-01-30 10:30:56 +01:00
* if there is a failure signaled in the stream .
2016-08-24 21:02:32 +02:00
*
* @see [ [ # foldAsync ] ]
2014-10-17 14:05:50 +02:00
*/
2015-06-14 03:12:30 -04:00
def fold [ U , T ] ( zero : U ) ( f : ( U , T ) ⇒ U ) : Sink [ T , Future [ U ] ] =
Flow [ T ] . fold ( zero ) ( f ) . toMat ( Sink . head ) ( Keep . right ) . named ( "foldSink" )
2014-10-17 14:05:50 +02:00
2016-08-24 21:02:32 +02:00
/* *
* A `Sink` that will invoke the given asynchronous function for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
*
* @see [ [ # fold ] ]
*/
def foldAsync [ U , T ] ( zero : U ) ( f : ( U , T ) ⇒ Future [ U ] ) : Sink [ T , Future [ U ] ] = Flow [ T ] . foldAsync ( zero ) ( f ) . toMat ( Sink . head ) ( Keep . right ) . named ( "foldAsyncSink" )
2016-01-15 22:51:26 -05:00
/* *
* A `Sink` that will invoke the given function for every received element , giving it its previous
* output ( from the second element ) and the element as input .
* The returned [ [ scala . concurrent . Future ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
2016-04-11 15:36:10 +02:00
*
* If the stream is empty ( i . e . completes before signalling any elements ) ,
* the reduce stage will fail its downstream with a [ [ NoSuchElementException ] ] ,
* which is semantically in - line with that Scala 's standard library collections
* do in such situations .
2017-07-26 16:23:46 +02:00
*
* Adheres to the [ [ ActorAttributes . SupervisionStrategy ] ] attribute .
2016-01-15 22:51:26 -05:00
*/
def reduce [ T ] ( f : ( T , T ) ⇒ T ) : Sink [ T , Future [ T ] ] =
Flow [ T ] . reduce ( f ) . toMat ( Sink . head ) ( Keep . right ) . named ( "reduceSink" )
2014-10-17 14:05:50 +02:00
/* *
2015-01-30 10:30:56 +01:00
* A `Sink` that when the flow is completed , either through a failure or normal
2014-10-17 14:05:50 +02:00
* completion , apply the provided function with [ [ scala . util . Success ] ]
* or [ [ scala . util . Failure ] ] .
*/
2016-01-20 10:00:37 +02:00
def onComplete [ T ] ( callback : Try [ Done ] ⇒ Unit ) : Sink [ T , NotUsed ] = {
2015-01-28 14:19:50 +01:00
2016-08-09 21:08:31 -05:00
def newOnCompleteStage ( ) : GraphStage [ FlowShape [ T , NotUsed ] ] = {
new GraphStage [ FlowShape [ T , NotUsed ] ] {
val in = Inlet [ T ] ( "in" )
val out = Outlet [ NotUsed ] ( "out" )
override val shape = FlowShape . of ( in , out )
override def createLogic ( inheritedAttributes : Attributes ) : GraphStageLogic =
new GraphStageLogic ( shape ) with InHandler with OutHandler {
2015-06-09 00:05:56 -04:00
2017-04-28 11:11:50 +02:00
var completionSignalled = false
2016-08-09 21:08:31 -05:00
override def onPush ( ) : Unit = pull ( in )
2015-06-09 00:05:56 -04:00
2016-08-09 21:08:31 -05:00
override def onPull ( ) : Unit = pull ( in )
override def onUpstreamFailure ( cause : Throwable ) : Unit = {
callback ( Failure ( cause ) )
2017-04-28 11:11:50 +02:00
completionSignalled = true
2016-08-09 21:08:31 -05:00
failStage ( cause )
}
override def onUpstreamFinish ( ) : Unit = {
callback ( Success ( Done ) )
2017-04-28 11:11:50 +02:00
completionSignalled = true
2016-08-09 21:08:31 -05:00
completeStage ( )
}
2017-04-28 11:11:50 +02:00
override def postStop ( ) : Unit = {
if ( ! completionSignalled ) callback ( Failure ( new AbruptStageTerminationException ( this ) ) )
}
2016-08-09 21:08:31 -05:00
setHandlers ( in , out , this )
2017-04-28 11:11:50 +02:00
2016-08-09 21:08:31 -05:00
}
2015-01-28 14:19:50 +01:00
}
}
2016-08-09 21:08:31 -05:00
Flow [ T ] . via ( newOnCompleteStage ( ) ) . to ( Sink . ignore ) . named ( "onCompleteSink" )
2015-01-28 14:19:50 +01:00
}
2015-03-30 14:42:30 +02:00
/* *
* Sends the elements of the stream to the given `ActorRef` .
2015-09-28 22:23:59 -07:00
* If the target actor terminates the stream will be canceled .
2015-03-30 14:42:30 +02:00
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor .
* When the stream is completed with failure a [ [ akka . actor . Status . Failure ] ]
* message will be sent to the destination actor .
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream , but there is no back - pressure signal from the destination actor ,
* i . e . if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow . For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
2015-03-31 15:13:57 +02:00
* limiting stage in front of this `Sink` .
2015-03-30 14:42:30 +02:00
*/
2016-01-20 10:00:37 +02:00
def actorRef [ T ] ( ref : ActorRef , onCompleteMessage : Any ) : Sink [ T , NotUsed ] =
2016-07-27 13:29:23 +02:00
fromGraph ( new ActorRefSink ( ref , onCompleteMessage , DefaultAttributes . actorRefSink , shape ( "ActorRefSink" ) ) )
2015-03-30 14:42:30 +02:00
2015-10-24 00:07:51 -04:00
/* *
* Sends the elements of the stream to the given `ActorRef` that sends back back - pressure signal .
* First element is always `onInitMessage` , then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements . It also requires `ackMessage` message after each stream element
* to make backpressure work .
*
* If the target actor terminates the stream will be canceled .
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor .
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor .
*/
def actorRefWithAck [ T ] ( ref : ActorRef , onInitMessage : Any , ackMessage : Any , onCompleteMessage : Any ,
2016-01-20 10:00:37 +02:00
onFailureMessage : ( Throwable ) ⇒ Any = Status . Failure ) : Sink [ T , NotUsed ] =
2015-10-24 00:07:51 -04:00
Sink . fromGraph ( new ActorRefBackpressureSinkStage ( ref , onInitMessage , ackMessage , onCompleteMessage , onFailureMessage ) )
2015-03-30 14:42:30 +02:00
/* *
* Creates a `Sink` that is materialized to an [ [ akka . actor . ActorRef ] ] which points to an Actor
2015-10-09 15:11:01 -04:00
* created according to the passed in [ [ akka . actor . Props ] ] . Actor created by the `props` must
2015-03-30 14:42:30 +02:00
* be [ [ akka . stream . actor . ActorSubscriber ] ] .
2016-12-09 14:08:13 +01:00
*
2016-12-08 17:22:01 +01:00
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead , it allows for all operations an Actor would and is more type - safe as well as guaranteed to be ReactiveStreams compliant .
2015-03-30 14:42:30 +02:00
*/
2016-12-08 17:22:01 +01:00
@deprecated ( "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant." , since = "2.5.0" )
2015-10-09 15:11:01 -04:00
def actorSubscriber [ T ] ( props : Props ) : Sink [ T , ActorRef ] = {
require ( classOf [ ActorSubscriber ] . isAssignableFrom ( props . actorClass ( ) ) , "Actor must be ActorSubscriber" )
2016-07-27 13:29:23 +02:00
fromGraph ( new ActorSubscriberSink ( props , DefaultAttributes . actorSubscriberSink , shape ( "ActorSubscriberSink" ) ) )
2015-10-09 15:11:01 -04:00
}
2015-03-30 14:42:30 +02:00
2015-08-19 23:04:20 -04:00
/* *
2016-01-14 15:22:25 +01:00
* Creates a `Sink` that is materialized as an [ [ akka . stream . scaladsl . SinkQueue ] ] .
* [ [ akka . stream . scaladsl . SinkQueue . pull ] ] method is pulling element from the stream and returns ` `Future[Option[T]]` ` .
2015-08-19 23:04:20 -04:00
* `Future` completes when element is available .
*
2015-12-04 09:37:32 -05:00
* Before calling pull method second time you need to wait until previous Future completes .
* Pull returns Failed future with ' 'IllegalStateException' ' if previous future has not yet completed .
2015-08-19 23:04:20 -04:00
*
2015-12-04 09:37:32 -05:00
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
* upstream and then stop back pressure . You can configure size of input
* buffer by using [ [ Sink . withAttributes ] ] method .
*
2016-01-14 15:22:25 +01:00
* For stream completion you need to pull all elements from [ [ akka . stream . scaladsl . SinkQueue ] ] including last None
2015-12-04 09:37:32 -05:00
* as completion marker
*
2016-08-11 07:37:54 -05:00
* See also [ [ akka . stream . scaladsl . SinkQueueWithCancel ] ]
2015-08-19 23:04:20 -04:00
*/
2016-01-14 15:22:25 +01:00
def queue [ T ] ( ) : Sink [ T , SinkQueueWithCancel [ T ] ] =
2016-02-15 13:38:37 +01:00
Sink . fromGraph ( new QueueSink ( ) )
2016-07-07 07:01:28 -04:00
/* *
* Creates a real `Sink` upon receiving the first element . Internal `Sink` will not be created if there are no elements ,
* because of completion or error .
*
* If `sinkFactory` throws an exception and the supervision decision is
* [ [ akka . stream . Supervision . Stop ] ] the `Future` will be completed with failure . For all other supervision options it will
* try to create sink with next element
*
* `fallback` will be executed when there was no elements and completed is received from upstream .
2017-07-26 16:23:46 +02:00
*
* Adheres to the [ [ ActorAttributes . SupervisionStrategy ] ] attribute .
2016-07-07 07:01:28 -04:00
*/
def lazyInit [ T , M ] ( sinkFactory : T ⇒ Future [ Sink [ T , M ] ] , fallback : ( ) ⇒ M ) : Sink [ T , Future [ M ] ] =
Sink . fromGraph ( new LazySink ( sinkFactory , fallback ) )
2014-10-17 14:05:50 +02:00
}