pekko/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala

440 lines
19 KiB
Scala
Raw Normal View History

/**
2017-01-04 17:37:10 +01:00
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.{ Done, NotUsed }
2015-11-10 15:15:59 +01:00
import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status }
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
import akka.stream.stage._
import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
* A `Sink` is a set of stream processing steps that has one open input.
* Can be used as a `Subscriber`
*/
2016-07-27 13:29:23 +02:00
final class Sink[-In, +Mat](
override val traversalBuilder: LinearTraversalBuilder,
override val shape: SinkShape[In])
extends Graph[SinkShape[In], Mat] {
2016-07-27 13:29:23 +02:00
// TODO: Debug string
override def toString: String = s"Sink($shape)"
2016-02-10 12:18:24 +01:00
/**
* Transform this Sink by applying a function to each *incoming* upstream element before
* it is passed to the [[Sink]]
*
* '''Backpressures when''' original [[Sink]] backpressures
*
* '''Cancels when''' original [[Sink]] backpressures
*/
def contramap[In2](f: In2 In): Sink[In2, Mat] = Flow.fromFunction(f).toMat(this)(Keep.right)
/**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
2015-03-06 10:23:26 +01:00
* of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]].
*/
def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: Materializer): Mat2 =
Source.fromGraph(source).to(this).run()
/**
* Transform only the materialized value of this Sink, leaving all other properties as they were.
*/
def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] =
2016-07-27 13:29:23 +02:00
new Sink(
traversalBuilder.transformMat(f.asInstanceOf[Any Any]),
shape)
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): Sink[In, Mat] =
2016-07-27 13:29:23 +02:00
new Sink(
traversalBuilder.setAttributes(attr),
shape)
/**
* Add the given attributes to this [[Sink]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Sink is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): Sink[In, Mat] =
2016-07-27 13:29:23 +02:00
withAttributes(traversalBuilder.attributes and attr)
/**
* Add a ``name`` attribute to this Sink.
*/
override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name))
/**
* Put an asynchronous boundary around this `Source`
*/
override def async: Sink[In, Mat] = super.async.asInstanceOf[Sink[In, Mat]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): Sink[In, Mat] =
super.async(dispatcher).asInstanceOf[Sink[In, Mat]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): Sink[In, Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[Sink[In, Mat]]
/**
* Converts this Scala DSL element to it's Java DSL counterpart.
*/
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this)
}
object Sink {
/** INTERNAL API */
def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in"))
/**
* A graph with the shape of a sink logically is a sink, this method makes
* it so also in type.
*/
def fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] =
g match {
case s: Sink[T, M] s
case s: javadsl.Sink[T, M] s.asScala
case g: GraphStageWithMaterializedValue[SinkShape[T], M]
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g.traversalBuilder.attributes
val noAttrStage = g.withAttributes(Attributes.none)
new Sink(
LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right),
noAttrStage.shape
).withAttributes(attrs)
2016-07-27 13:29:23 +02:00
case other new Sink(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape)
}
/**
* Helper to create [[Sink]] from `Subscriber`.
*/
def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed] =
2016-07-27 13:29:23 +02:00
fromGraph(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink")))
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
def cancelled[T]: Sink[T, NotUsed] =
2016-07-27 13:29:23 +02:00
fromGraph[Any, NotUsed](new CancelSink(DefaultAttributes.cancelledSink, shape("CancelledSink")))
/**
* A `Sink` that materializes into a `Future` of the first value received.
2015-11-10 15:15:59 +01:00
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[headOption]].
*/
def head[T]: Sink[T, Future[T]] =
Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headSink)
.mapMaterializedValue(e e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
2015-11-10 15:15:59 +01:00
/**
* A `Sink` that materializes into a `Future` of the optional first value received.
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[head]].
*/
def headOption[T]: Sink[T, Future[Option[T]]] =
Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headOptionSink)
/**
* A `Sink` that materializes into a `Future` of the last value received.
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
2017-04-27 17:31:33 +02:00
* If the stream signals an error, the Future will be failed with the stream's exception.
*
* See also [[lastOption]].
*/
def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink)
.mapMaterializedValue(e e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
/**
* A `Sink` that materializes into a `Future` of the optional last value received.
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
2017-04-27 17:31:33 +02:00
* If the stream signals an error, the Future will be failed with the stream's exception.
*
* See also [[last]].
*/
def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink)
/**
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
2015-12-22 19:49:09 +01:00
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)
* may be used to ensure boundedness.
2015-12-22 19:49:09 +01:00
* Materializes into a `Future` of `Seq[T]` containing all the collected elements.
* `Seq` is limited to `Int.MaxValue` elements, this Sink will cancel the stream
* after having received that many elements.
*
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T])
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
*
* If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and
* the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that
* the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing
* the processing down due to back pressure.
*
* If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber`s.
*/
def asPublisher[T](fanout: Boolean): Sink[T, Publisher[T]] =
2016-07-27 13:29:23 +02:00
fromGraph(
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
/**
* A `Sink` that will consume the stream and discard the elements.
*/
def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream..
*/
def foreach[T](f: T Unit): Sink[T, Future[Done]] =
Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
/**
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
*/
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
Sink.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val d = b.add(strategy(rest.size + 2))
d.out(0) ~> first
d.out(1) ~> second
@tailrec def combineRest(idx: Int, i: Iterator[Sink[U, _]]): SinkShape[T] =
if (i.hasNext) {
d.out(idx) ~> i.next()
combineRest(idx + 1, i)
} else new SinkShape(d.in)
combineRest(2, rest.iterator)
})
/**
* A `Sink` that will invoke the given function to each of the elements
* as they pass in. The sink is materialized into a [[scala.concurrent.Future]]
*
* If `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure.
*
* If `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the
* element is dropped and the stream continues.
*
* See also [[Flow.mapAsyncUnordered]]
*/
def foreachParallel[T](parallelism: Int)(f: T Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] =
Flow[T].mapAsyncUnordered(parallelism)(t Future(f(t))).toMat(Sink.ignore)(Keep.right)
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
2016-08-24 21:02:32 +02:00
*
* @see [[#foldAsync]]
*/
def fold[U, T](zero: U)(f: (U, T) U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
2016-08-24 21:02:32 +02:00
/**
* A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* @see [[#fold]]
*/
def foldAsync[U, T](zero: U)(f: (U, T) Future[U]): Sink[T, Future[U]] = Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink")
2016-01-15 22:51:26 -05:00
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
2016-01-15 22:51:26 -05:00
*/
def reduce[T](f: (T, T) T): Sink[T, Future[T]] =
Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink")
/**
* A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
def onComplete[T](callback: Try[Done] Unit): Sink[T, NotUsed] = {
def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = {
new GraphStage[FlowShape[T, NotUsed]] {
val in = Inlet[T]("in")
val out = Outlet[NotUsed]("out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var completionSignalled = false
override def onPush(): Unit = pull(in)
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
callback(Failure(cause))
completionSignalled = true
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
callback(Success(Done))
completionSignalled = true
completeStage()
}
override def postStop(): Unit = {
if (!completionSignalled) callback(Failure(new AbruptStageTerminationException(this)))
}
setHandlers(in, out, this)
}
}
}
Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink")
}
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure a [[akka.actor.Status.Failure]]
* message will be sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
2016-07-27 13:29:23 +02:00
fromGraph(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink")))
2015-10-24 00:07:51 -04:00
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) Any = Status.Failure): Sink[T, NotUsed] =
2015-10-24 00:07:51 -04:00
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorSubscriber]].
2016-12-09 14:08:13 +01:00
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
2016-07-27 13:29:23 +02:00
fromGraph(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
}
/**
2016-01-14 15:22:25 +01:00
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueue]].
* [[akka.stream.scaladsl.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* `Future` completes when element is available.
*
2015-12-04 09:37:32 -05:00
* Before calling pull method second time you need to wait until previous Future completes.
* Pull returns Failed future with ''IllegalStateException'' if previous future has not yet completed.
*
2015-12-04 09:37:32 -05:00
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
2016-01-14 15:22:25 +01:00
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None
2015-12-04 09:37:32 -05:00
* as completion marker
*
* See also [[akka.stream.scaladsl.SinkQueueWithCancel]]
*/
2016-01-14 15:22:25 +01:00
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink())
2016-07-07 07:01:28 -04:00
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
* because of completion or error.
*
* If `sinkFactory` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will
* try to create sink with next element
*
* `fallback` will be executed when there was no elements and completed is received from upstream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
2016-07-07 07:01:28 -04:00
*/
def lazyInit[T, M](sinkFactory: T Future[Sink[T, M]], fallback: () M): Sink[T, Future[M]] =
Sink.fromGraph(new LazySink(sinkFactory, fallback))
}