2014-10-03 17:33:14 +02:00
/* *
2017-01-04 17:37:10 +01:00
* Copyright ( C ) 2015 - 2017 Lightbend Inc . < http : //www.lightbend.com>
2014-10-03 17:33:14 +02:00
*/
package akka.stream.javadsl
2016-01-14 16:20:39 +01:00
import java.util.Optional
2016-01-20 10:00:37 +02:00
import akka. { Done , NotUsed }
2015-04-16 02:24:01 +02:00
import akka.actor. { ActorRef , Props }
2015-11-10 15:15:59 +01:00
import akka.dispatch.ExecutionContexts
2015-04-23 20:59:55 +02:00
import akka.japi.function
2016-08-11 07:37:54 -05:00
import akka.stream.impl. { StreamLayout , SinkQueueAdapter }
2015-04-16 02:24:01 +02:00
import akka.stream. { javadsl , scaladsl , _ }
import org.reactivestreams. { Publisher , Subscriber }
2016-01-14 16:20:39 +01:00
import scala.compat.java8.OptionConverters._
2016-08-11 07:37:54 -05:00
import scala.concurrent.ExecutionContext
2015-03-05 12:21:17 +01:00
import scala.util.Try
2016-01-21 16:37:26 +01:00
import java.util.concurrent.CompletionStage
2016-07-07 07:01:28 -04:00
import scala.compat.java8.FutureConverters._
2014-10-03 17:33:14 +02:00
2014-10-20 14:09:24 +02:00
/* * Java API */
2014-10-03 17:33:14 +02:00
object Sink {
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that will invoke the given function for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
2016-01-21 16:37:26 +01:00
* The returned [ [ java . util . concurrent . CompletionStage ] ] will be completed with value of the final
2014-10-17 14:05:50 +02:00
* function evaluation when the input stream ends , or completed with `Failure`
2015-01-30 10:30:56 +01:00
* if there is a failure is signaled in the stream .
2014-10-17 14:05:50 +02:00
*/
2016-01-21 16:37:26 +01:00
def fold [ U , In ] ( zero : U , f : function.Function2 [ U , In , U ] ) : javadsl.Sink [ In , CompletionStage [ U ] ] =
new Sink ( scaladsl . Sink . fold [ U , In ] ( zero ) ( f . apply ) . toCompletionStage ( ) )
2014-10-17 14:05:50 +02:00
2016-08-24 21:02:32 +02:00
/* *
* A `Sink` that will invoke the given asynchronous function for every received element , giving it its previous
* output ( or the given `zero` value ) and the element as input .
* The returned [ [ java . util . concurrent . CompletionStage ] ] will be completed with value of the final
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure is signaled in the stream .
*/
def foldAsync [ U , In ] ( zero : U , f : function.Function2 [ U , In , CompletionStage [ U ] ] ) : javadsl.Sink [ In , CompletionStage [ U ] ] = new Sink ( scaladsl . Sink . foldAsync [ U , In ] ( zero ) ( f ( _ , _ ) . toScala ) . toCompletionStage ( ) )
2016-01-15 22:51:26 -05:00
/* *
* A `Sink` that will invoke the given function for every received element , giving it its previous
* output ( from the second element ) and the element as input .
2016-01-21 16:37:26 +01:00
* The returned [ [ java . util . concurrent . CompletionStage ] ] will be completed with value of the final
2016-01-15 22:51:26 -05:00
* function evaluation when the input stream ends , or completed with `Failure`
* if there is a failure signaled in the stream .
2016-04-11 15:36:10 +02:00
*
* If the stream is empty ( i . e . completes before signalling any elements ) ,
* the reduce stage will fail its downstream with a [ [ NoSuchElementException ] ] ,
* which is semantically in - line with that Scala 's standard library collections
* do in such situations .
2016-01-15 22:51:26 -05:00
*/
2016-01-21 16:37:26 +01:00
def reduce [ In ] ( f : function.Function2 [ In , In , In ] ) : Sink [ In , CompletionStage [ In ] ] =
new Sink ( scaladsl . Sink . reduce [ In ] ( f . apply ) . toCompletionStage ( ) )
2016-01-15 22:51:26 -05:00
2014-10-17 14:05:50 +02:00
/* *
2014-10-20 14:09:24 +02:00
* Helper to create [ [ Sink ] ] from `Subscriber` .
2014-10-17 14:05:50 +02:00
*/
2016-01-20 10:00:37 +02:00
def fromSubscriber [ In ] ( subs : Subscriber [ In ] ) : Sink [ In , NotUsed ] =
2015-12-17 11:48:30 +02:00
new Sink ( scaladsl . Sink . fromSubscriber ( subs ) )
2014-10-20 14:09:24 +02:00
/* *
* A `Sink` that immediately cancels its upstream after materialization .
*/
2016-01-20 10:00:37 +02:00
def cancelled [ T ] ( ) : Sink [ T , NotUsed ] =
2014-10-27 14:35:41 +01:00
new Sink ( scaladsl . Sink . cancelled )
2014-10-20 14:09:24 +02:00
/* *
* A `Sink` that will consume the stream and discard the elements .
2014-10-17 14:05:50 +02:00
*/
2016-01-21 16:37:26 +01:00
def ignore [ T ] ( ) : Sink [ T , CompletionStage [ Done ] ] =
new Sink ( scaladsl . Sink . ignore . toCompletionStage ( ) )
2014-10-17 14:05:50 +02:00
/* *
* A `Sink` that materializes into a [ [ org . reactivestreams . Publisher ] ] .
2015-10-30 16:00:44 +01:00
*
2015-11-03 12:53:24 +01:00
* If `fanout` is `true` , the materialized `Publisher` will support multiple `Subscriber` s and
* the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that
* the fastest [ [ org . reactivestreams . Subscriber ] ] can be ahead of the slowest one before slowing
* the processing down due to back pressure .
*
* If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber` s .
2014-10-17 14:05:50 +02:00
*/
2016-01-20 21:01:27 +01:00
def asPublisher [ T ] ( fanout : AsPublisher ) : Sink [ T , Publisher [ T ] ] =
new Sink ( scaladsl . Sink . asPublisher ( fanout == AsPublisher . WITH_FANOUT ) )
2014-10-20 14:09:24 +02:00
/* *
* A `Sink` that will invoke the given procedure for each received element . The sink is materialized
2016-01-21 16:37:26 +01:00
* into a [ [ java . util . concurrent . CompletionStage ] ] will be completed with `Success` when reaching the
2015-01-30 10:30:56 +01:00
* normal end of the stream , or completed with `Failure` if there is a failure is signaled in
2014-10-20 14:09:24 +02:00
* the stream . .
*/
2016-01-21 16:37:26 +01:00
def foreach [ T ] ( f : function.Procedure [ T ] ) : Sink [ T , CompletionStage [ Done ] ] =
new Sink ( scaladsl . Sink . foreach ( f . apply ) . toCompletionStage ( ) )
2015-06-06 14:36:49 +02:00
2015-06-09 00:05:56 -04:00
/* *
* A `Sink` that will invoke the given procedure for each received element in parallel . The sink is materialized
2016-01-21 16:37:26 +01:00
* into a [ [ java . util . concurrent . CompletionStage ] ] .
2015-06-09 00:05:56 -04:00
*
* If `f` throws an exception and the supervision decision is
2016-01-21 16:37:26 +01:00
* [ [ akka . stream . Supervision . Stop ] ] the `CompletionStage` will be completed with failure .
2015-06-09 00:05:56 -04:00
*
* If `f` throws an exception and the supervision decision is
* [ [ akka . stream . Supervision . Resume ] ] or [ [ akka . stream . Supervision . Restart ] ] the
* element is dropped and the stream continues .
*/
2016-01-21 16:37:26 +01:00
def foreachParallel [ T ] ( parallel : Int ) ( f : function.Procedure [ T ] ) ( ec : ExecutionContext ) : Sink [ T , CompletionStage [ Done ] ] =
new Sink ( scaladsl . Sink . foreachParallel ( parallel ) ( f . apply ) ( ec ) . toCompletionStage ( ) )
2014-10-20 14:09:24 +02:00
2014-10-17 14:05:50 +02:00
/* *
2015-01-30 10:30:56 +01:00
* A `Sink` that when the flow is completed , either through a failure or normal
2014-10-17 14:05:50 +02:00
* completion , apply the provided function with [ [ scala . util . Success ] ]
* or [ [ scala . util . Failure ] ] .
*/
2016-01-20 10:00:37 +02:00
def onComplete [ In ] ( callback : function.Procedure [ Try [ Done ] ] ) : Sink [ In , NotUsed ] =
2015-03-05 12:21:17 +01:00
new Sink ( scaladsl . Sink . onComplete [ In ] ( x ⇒ callback . apply ( x ) ) )
2014-10-17 14:05:50 +02:00
/* *
2016-01-21 16:37:26 +01:00
* A `Sink` that materializes into a `CompletionStage` of the first value received .
* If the stream completes before signaling at least a single element , the CompletionStage will be failed with a [ [ NoSuchElementException ] ] .
2016-08-11 07:37:54 -05:00
* If the stream signals an error before signaling at least a single element , the CompletionStage will be failed with the streams exception .
2015-11-10 15:15:59 +01:00
*
* See also [ [ headOption ] ] .
2014-10-17 14:05:50 +02:00
*/
2016-01-21 16:37:26 +01:00
def head [ In ] ( ) : Sink [ In , CompletionStage [ In ] ] =
new Sink ( scaladsl . Sink . head [ In ] . toCompletionStage ( ) )
2014-10-20 14:09:24 +02:00
2015-11-10 15:15:59 +01:00
/* *
2016-01-21 16:37:26 +01:00
* A `Sink` that materializes into a `CompletionStage` of the optional first value received .
* If the stream completes before signaling at least a single element , the value of the CompletionStage will be an empty [ [ java . util . Optional ] ] .
* If the stream signals an error errors before signaling at least a single element , the CompletionStage will be failed with the streams exception .
2015-11-10 15:15:59 +01:00
*
* See also [ [ head ] ] .
*/
2016-01-21 16:37:26 +01:00
def headOption [ In ] ( ) : Sink [ In , CompletionStage [ Optional [ In ] ] ] =
2015-11-10 15:15:59 +01:00
new Sink ( scaladsl . Sink . headOption [ In ] . mapMaterializedValue (
2016-01-21 16:37:26 +01:00
_ . map ( _ . asJava ) ( ExecutionContexts . sameThreadExecutionContext ) . toJava ) )
2015-11-10 15:15:59 +01:00
2015-11-18 00:09:04 +01:00
/* *
2016-01-21 16:37:26 +01:00
* A `Sink` that materializes into a `CompletionStage` of the last value received .
* If the stream completes before signaling at least a single element , the CompletionStage will be failed with a [ [ NoSuchElementException ] ] .
* If the stream signals an error errors before signaling at least a single element , the CompletionStage will be failed with the streams exception .
2015-11-18 00:09:04 +01:00
*
* See also [ [ lastOption ] ] .
*/
2016-01-21 16:37:26 +01:00
def last [ In ] ( ) : Sink [ In , CompletionStage [ In ] ] =
new Sink ( scaladsl . Sink . last [ In ] . toCompletionStage ( ) )
2015-11-18 00:09:04 +01:00
/* *
2016-01-21 16:37:26 +01:00
* A `Sink` that materializes into a `CompletionStage` of the optional last value received .
* If the stream completes before signaling at least a single element , the value of the CompletionStage will be an empty [ [ java . util . Optional ] ] .
* If the stream signals an error errors before signaling at least a single element , the CompletionStage will be failed with the streams exception .
2015-11-18 00:09:04 +01:00
*
* See also [ [ head ] ] .
*/
2016-01-21 16:37:26 +01:00
def lastOption [ In ] ( ) : Sink [ In , CompletionStage [ Optional [ In ] ] ] =
2015-11-18 00:09:04 +01:00
new Sink ( scaladsl . Sink . lastOption [ In ] . mapMaterializedValue (
2016-01-21 16:37:26 +01:00
_ . map ( _ . asJava ) ( ExecutionContexts . sameThreadExecutionContext ) . toJava ) )
2015-11-18 00:09:04 +01:00
2015-11-19 00:11:07 +08:00
/* *
* A `Sink` that keeps on collecting incoming elements until upstream terminates .
2015-12-22 19:49:09 +01:00
* As upstream may be unbounded , `Flow[T].take` or the stricter `Flow[T].limit` ( and their variants )
2015-11-19 00:11:07 +08:00
* may be used to ensure boundedness .
2016-01-21 16:37:26 +01:00
* Materializes into a `CompletionStage` of `Seq[T]` containing all the collected elements .
2016-01-17 15:04:45 +01:00
* `List` is limited to `Integer.MAX_VALUE` elements , this Sink will cancel the stream
* after having received that many elements .
2015-11-19 00:11:07 +08:00
*
* See also [ [ Flow . limit ] ] , [ [ Flow . limitWeighted ] ] , [ [ Flow . take ] ] , [ [ Flow . takeWithin ] ] , [ [ Flow . takeWhile ] ]
*/
2016-01-21 16:37:26 +01:00
def seq [ In ] : Sink [ In , CompletionStage [ java . util . List [ In ] ] ] = {
2015-11-19 00:11:07 +08:00
import scala.collection.JavaConverters._
2016-01-21 16:37:26 +01:00
new Sink ( scaladsl . Sink . seq [ In ] . mapMaterializedValue ( fut ⇒ fut . map ( sq ⇒ sq . asJava ) ( ExecutionContexts . sameThreadExecutionContext ) . toJava ) )
2015-11-19 00:11:07 +08:00
}
2015-03-30 14:42:30 +02:00
/* *
* Sends the elements of the stream to the given `ActorRef` .
2015-09-28 22:23:59 -07:00
* If the target actor terminates the stream will be canceled .
2015-03-30 14:42:30 +02:00
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor .
* When the stream is completed with failure a [ [ akka . actor . Status . Failure ] ]
* message will be sent to the destination actor .
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream , but there is no back - pressure signal from the destination actor ,
* i . e . if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow . For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
2015-03-31 15:13:57 +02:00
* limiting stage in front of this `Sink` .
2015-03-30 14:42:30 +02:00
*
*/
2016-01-20 10:00:37 +02:00
def actorRef [ In ] ( ref : ActorRef , onCompleteMessage : Any ) : Sink [ In , NotUsed ] =
2015-03-30 14:42:30 +02:00
new Sink ( scaladsl . Sink . actorRef [ In ] ( ref , onCompleteMessage ) )
2015-10-24 00:07:51 -04:00
/* *
* Sends the elements of the stream to the given `ActorRef` that sends back back - pressure signal .
* First element is always `onInitMessage` , then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements . It also requires `ackMessage` message after each stream element
* to make backpressure work .
*
* If the target actor terminates the stream will be canceled .
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor .
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* message will be sent to the destination actor .
*/
def actorRefWithAck [ In ] ( ref : ActorRef , onInitMessage : Any , ackMessage : Any , onCompleteMessage : Any ,
2016-01-20 10:00:37 +02:00
onFailureMessage : function.Function [ Throwable , Any ] ) : Sink [ In , NotUsed ] =
2015-10-24 00:07:51 -04:00
new Sink ( scaladsl . Sink . actorRefWithAck [ In ] ( ref , onInitMessage , ackMessage , onCompleteMessage , onFailureMessage . apply ) )
2015-03-30 14:42:30 +02:00
/* *
* Creates a `Sink` that is materialized to an [ [ akka . actor . ActorRef ] ] which points to an Actor
* created according to the passed in [ [ akka . actor . Props ] ] . Actor created by the `props` should
* be [ [ akka . stream . actor . ActorSubscriber ] ] .
2016-12-09 14:08:13 +01:00
*
2016-12-08 17:22:01 +01:00
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead , it allows for all operations an Actor would and is more type - safe as well as guaranteed to be ReactiveStreams compliant .
2015-03-30 14:42:30 +02:00
*/
2016-12-08 17:22:01 +01:00
@deprecated ( "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant." , since = "2.5.0" )
2015-03-30 14:42:30 +02:00
def actorSubscriber [ T ] ( props : Props ) : Sink [ T , ActorRef ] =
new Sink ( scaladsl . Sink . actorSubscriber ( props ) )
2015-03-04 15:22:33 +01:00
/* *
* A graph with the shape of a sink logically is a sink , this method makes
* it so also in type .
*/
2015-10-21 22:45:39 +02:00
def fromGraph [ T , M ] ( g : Graph [ SinkShape [ T ] , M ] ) : Sink [ T , M ] =
2015-06-06 17:17:23 +02:00
g match {
case s : Sink [ T , M ] ⇒ s
2015-10-21 22:45:39 +02:00
case other ⇒ new Sink ( scaladsl . Sink . fromGraph ( other ) )
2015-06-06 17:17:23 +02:00
}
2015-06-29 23:47:31 -04:00
/* *
* Combine several sinks with fan - out strategy like `Broadcast` or `Balance` and returns `Sink` .
*/
2016-01-20 10:00:37 +02:00
def combine [ T , U ] ( output1 : Sink [ U , _ ] , output2 : Sink [ U , _ ] , rest : java.util.List [ Sink [ U , _ ] ] , strategy : function.Function [ java . lang . Integer , Graph [ UniformFanOutShape [ T , U ] , NotUsed ] ] ) : Sink [ T , NotUsed ] = {
2015-06-29 23:47:31 -04:00
import scala.collection.JavaConverters._
val seq = if ( rest != null ) rest . asScala . map ( _ . asScala ) else Seq ( )
new Sink ( scaladsl . Sink . combine ( output1 . asScala , output2 . asScala , seq : _ * ) ( num ⇒ strategy . apply ( num ) ) )
}
2015-08-19 23:04:20 -04:00
/* *
2016-01-14 15:22:25 +01:00
* Creates a `Sink` that is materialized as an [ [ akka . stream . javadsl . SinkQueue ] ] .
* [ [ akka . stream . javadsl . SinkQueue . pull ] ] method is pulling element from the stream and returns ` `CompletionStage[Option[T]]` ` .
2016-01-21 16:37:26 +01:00
* `CompletionStage` completes when element is available .
2015-08-19 23:04:20 -04:00
*
2016-01-21 16:37:26 +01:00
* Before calling pull method second time you need to wait until previous CompletionStage completes .
2015-12-04 09:37:32 -05:00
* Pull returns Failed future with ' 'IllegalStateException' ' if previous future has not yet completed .
2015-08-19 23:04:20 -04:00
*
2015-12-04 09:37:32 -05:00
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
* upstream and then stop back pressure . You can configure size of input
* buffer by using [ [ Sink . withAttributes ] ] method .
*
2016-01-14 15:22:25 +01:00
* For stream completion you need to pull all elements from [ [ akka . stream . javadsl . SinkQueue ] ] including last None
2015-12-04 09:37:32 -05:00
* as completion marker
*
2016-01-14 15:22:25 +01:00
* @see [ [ akka . stream . javadsl . SinkQueueWithCancel ] ]
2015-08-19 23:04:20 -04:00
*/
2016-01-14 15:22:25 +01:00
def queue [ T ] ( ) : Sink [ T , SinkQueueWithCancel [ T ] ] =
2016-01-21 16:37:26 +01:00
new Sink ( scaladsl . Sink . queue [ T ] ( ) . mapMaterializedValue ( new SinkQueueAdapter ( _ ) ) )
2016-07-07 07:01:28 -04:00
/* *
* Creates a real `Sink` upon receiving the first element . Internal `Sink` will not be created if there are no elements ,
* because of completion or error .
*
* If `sinkFactory` throws an exception and the supervision decision is
* [ [ akka . stream . Supervision . Stop ] ] the `Future` will be completed with failure . For all other supervision options it will
* try to create sink with next element
*
* `fallback` will be executed when there was no elements and completed is received from upstream .
*/
def lazyInit [ T , M ] ( sinkFactory : function.Function [ T , CompletionStage [ Sink [ T , M ] ] ] , fallback : function.Creator [ M ] ) : Sink [ T , CompletionStage [ M ] ] =
new Sink ( scaladsl . Sink . lazyInit [ T , M ] (
t ⇒ sinkFactory . apply ( t ) . toScala . map ( _ . asScala ) ( ExecutionContexts . sameThreadExecutionContext ) ,
( ) ⇒ fallback . create ( ) ) . mapMaterializedValue ( _ . toJava ) )
2014-10-03 17:33:14 +02:00
}
2014-10-17 14:05:50 +02:00
2014-10-03 17:33:14 +02:00
/* *
2014-10-20 14:09:24 +02:00
* Java API
*
2016-08-11 07:37:54 -05:00
* A `Sink` is a set of stream processing steps that has one open input .
2014-10-03 17:33:14 +02:00
* Can be used as a `Subscriber`
*/
2015-11-01 21:03:28 +01:00
final class Sink [ - In , + Mat ] ( delegate : scaladsl.Sink [ In , Mat ] ) extends Graph [ SinkShape [ In ] , Mat ] {
2014-10-03 17:33:14 +02:00
2015-01-28 14:19:50 +01:00
override def shape : SinkShape [ In ] = delegate . shape
2016-05-03 18:58:26 -07:00
def module : StreamLayout . Module = delegate . module
2014-10-03 17:33:14 +02:00
2016-03-11 17:08:30 +01:00
override def toString : String = delegate . toString
2016-08-11 07:37:54 -05:00
/* *
* Converts this Sink to its Scala DSL counterpart .
*/
2015-01-28 14:19:50 +01:00
def asScala : scaladsl.Sink [ In , Mat ] = delegate
2014-10-03 17:33:14 +02:00
2014-10-20 14:09:24 +02:00
/* *
2014-10-20 14:09:24 +02:00
* Connect this `Sink` to a `Source` and run it .
2014-10-20 14:09:24 +02:00
*/
2015-06-23 18:28:53 +02:00
def runWith [ M ] ( source : Graph [ SourceShape [ In ] , M ] , materializer : Materializer ) : M =
2015-04-24 12:14:04 +02:00
asScala . runWith ( source ) ( materializer )
2014-10-03 17:33:14 +02:00
2016-02-10 12:18:24 +01:00
/* *
* Transform this Sink by applying a function to each * incoming * upstream element before
* it is passed to the [ [ Sink ] ]
*
* ''' Backpressures when ''' original [ [ Sink ] ] backpressures
*
* ''' Cancels when ''' original [ [ Sink ] ] backpressures
*/
def contramap [ In2 ] ( f : function.Function [ In2 , In ] ) : Sink [ In2 , Mat ] =
javadsl . Flow . fromFunction ( f ) . toMat ( this , Keep . right [ NotUsed , Mat ] )
2015-01-28 14:19:50 +01:00
/* *
* Transform only the materialized value of this Sink , leaving all other properties as they were .
*/
2015-05-05 10:29:41 +02:00
def mapMaterializedValue [ Mat2 ] ( f : function.Function [ Mat , Mat2 ] ) : Sink [ In , Mat2 ] =
new Sink ( delegate . mapMaterializedValue ( f . apply _ ) )
2015-03-05 12:21:17 +01:00
2015-12-22 20:56:02 +01:00
/* *
2016-08-11 07:37:54 -05:00
* Change the attributes of this [ [ Sink ] ] to the given ones and seal the list
2015-12-22 20:56:02 +01:00
* of attributes . This means that further calls will not be able to remove these
* attributes , but instead add new ones . Note that this
* operation has no effect on an empty Flow ( because the attributes apply
* only to the contained processing stages ) .
*/
2015-06-23 17:32:55 +02:00
override def withAttributes ( attr : Attributes ) : javadsl.Sink [ In , Mat ] =
2015-04-10 16:49:49 +02:00
new Sink ( delegate . withAttributes ( attr ) )
2015-03-05 12:21:17 +01:00
2015-12-22 20:56:02 +01:00
/* *
2016-08-11 07:37:54 -05:00
* Add the given attributes to this Sink . Further calls to `withAttributes`
2015-12-22 20:56:02 +01:00
* will not remove these attributes . Note that this
* operation has no effect on an empty Flow ( because the attributes apply
* only to the contained processing stages ) .
*/
override def addAttributes ( attr : Attributes ) : javadsl.Sink [ In , Mat ] =
new Sink ( delegate . addAttributes ( attr ) )
/* *
2016-08-11 07:37:54 -05:00
* Add a ` `name` ` attribute to this Sink .
2015-12-22 20:56:02 +01:00
*/
2015-04-14 08:59:37 +02:00
override def named ( name : String ) : javadsl.Sink [ In , Mat ] =
2015-03-05 12:21:17 +01:00
new Sink ( delegate . named ( name ) )
2016-02-10 13:56:38 +01:00
/* *
* Put an asynchronous boundary around this `Sink`
*/
override def async : javadsl.Sink [ In , Mat ] =
new Sink ( delegate . async )
2014-10-17 14:05:50 +02:00
}