pekko/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
Konrad Malawski f9911facc0 +str #24812 fix signature of monitor()
* make monitor be a keep.both by default
2018-09-25 09:29:51 +02:00

2147 lines
97 KiB
Scala
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl
import akka.NotUsed
import akka.event.LoggingAdapter
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import java.util.Comparator
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
/**
* * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with
* fan-in operators where you do not want to pay the cost of casting each element in a `map`.
*/
object SubSource {
def upcast[U, T <: U, Mat](source: SubSource[T, Mat]): SubSource[U, Mat] = source.asInstanceOf[SubSource[U, Mat]]
}
/**
* A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`.
* SubFlows cannot contribute to the super-flows materialized value since they
* are materialized later, during the runtime of the flow graph processing.
*/
class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]]) {
/** Converts this Flow to its Scala DSL counterpart */
def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]] @uncheckedVariance = delegate
/**
* Flatten the sub-flows back into the super-source by performing a merge
* without parallelism limit (i.e. having an unbounded number of sub-flows
* active concurrently).
*
* This is identical in effect to `mergeSubstreamsWithParallelism(Integer.MAX_VALUE)`.
*/
def mergeSubstreams(): Source[Out, Mat] =
new Source(delegate.mergeSubstreams)
/**
* Flatten the sub-flows back into the super-source by performing a merge
* with the given parallelism limit. This means that only up to `parallelism`
* substreams will be executed at any given time. Substreams that are not
* yet executed are also not materialized, meaning that back-pressure will
* be exerted at the operator that creates the substreams when the parallelism
* limit is reached.
*/
def mergeSubstreamsWithParallelism(parallelism: Int): Source[Out, Mat] =
new Source(delegate.mergeSubstreamsWithParallelism(parallelism))
/**
* Flatten the sub-flows back into the super-source by concatenating them.
* This is usually a bad idea when combined with `groupBy` since it can
* easily lead to deadlock—the concatenation does not consume from the second
* substream until the first has finished and the `groupBy` operator will get
* back-pressure from the second stream.
*
* This is identical in effect to `mergeSubstreamsWithParallelism(1)`.
*/
def concatSubstreams(): Source[Out, Mat] =
new Source(delegate.concatSubstreams)
/**
* Transform this [[SubSource]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Source |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] =
new SubSource(delegate.via(flow))
/**
* Connect this [[SubSource]] to a [[Sink]], concatenating the processing steps of both.
* This means that all sub-flows that result from the previous sub-stream operator
* will be attached to the given sink.
* {{{
* +----------------------------+
* | Resulting RunnableGraph |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | sink | |
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
*/
def to(sink: Graph[SinkShape[Out], _]): RunnableGraph[Mat] =
RunnableGraph.fromGraph(delegate.to(sink))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def map[T](f: function.Function[Out, T]): SubSource[T, Mat] =
new SubSource(delegate.map(f.apply))
/**
* This is a simplified version of `wireTap(Sink)` that takes only a simple procedure.
* Elements will be passed into this "side channel" function, and any of its results will be ignored.
*
* If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead.
*
* This operation is useful for inspecting the passed through element, usually by means of side-effecting
* operations (such as `println`, or emitting metrics), for each element without having to modify it.
*
* For logging signals (elements, completion, error) consider using the [[log]] operator instead,
* along with appropriate `ActorAttributes.logLevels`.
*
* '''Emits when''' upstream emits an element; the same element will be passed to the attached function,
* as well as to the downstream operator
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def wireTap(f: function.Procedure[Out]): SubSource[Out, Mat] =
new SubSource(delegate.wireTap(f(_)))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream.
*
* Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization —
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]].
*
* Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
})
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `CompletionStage` and the
* value of that future will be emitted downstream. The number of CompletionStages
* that shall run in parallel is given as the first argument to ``mapAsync``.
* These CompletionStages may complete in any order, but the elements that
* are emitted downstream are in the same order as received from upstream.
*
* If the function `f` throws an exception or if the `CompletionStage` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#stop]]
* the stream will be completed with failure.
*
* If the function `f` throws an exception or if the `CompletionStage` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#resume]] or
* [[akka.stream.Supervision#restart]] the element is dropped and the stream continues.
*
* The function `f` is always invoked on the elements in the order they arrive.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the CompletionStage returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of CompletionStages reaches the configured parallelism and the downstream
* backpressures or the first CompletionStage is not completed
*
* '''Completes when''' upstream completes and all CompletionStages has been completed and all elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsync(parallelism)(x f(x).toScala))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `CompletionStage` and the
* value of that future will be emitted downstream. The number of CompletionStages
* that shall run in parallel is given as the first argument to ``mapAsyncUnordered``.
* Each processed element will be emitted downstream as soon as it is ready, i.e. it is possible
* that the elements are not emitted downstream in the same order as received from upstream.
*
* If the function `f` throws an exception or if the `CompletionStage` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#stop]]
* the stream will be completed with failure.
*
* If the function `f` throws an exception or if the `CompletionStage` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#resume]] or
* [[akka.stream.Supervision#restart]] the element is dropped and the stream continues.
*
* The function `f` is always invoked on the elements in the order they arrive (even though the result of the futures
* returned by `f` might be emitted in a different order).
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' any of the CompletionStage returned by the provided function complete
*
* '''Backpressures when''' the number of CompletionStage reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all CompletionStage has been completed and all elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Only pass on those elements that satisfy the given predicate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the given predicate returns true for the element
*
* '''Backpressures when''' the given predicate returns true for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def filter(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.filter(p.test))
/**
* Only pass on those elements that NOT satisfy the given predicate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the given predicate returns false for the element
*
* '''Backpressures when''' the given predicate returns false for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def filterNot(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.filterNot(p.test))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collect[T](pf: PartialFunction[Out, T]): SubSource[T, Mat] =
new SubSource(delegate.collect(pf))
/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the element is an instance of the provided type
*
* '''Backpressures when''' the element is an instance of the provided type and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collectType[T](clazz: Class[T]): javadsl.SubSource[T, Mat] =
new SubSource(delegate.collectType[T](ClassTag[T](clazz)))
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' the specified number of elements has been accumulated or upstream completed
*
* '''Backpressures when''' a group has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def grouped(n: Int): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
* `step` must be positive, otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' enough elements have been collected within the window or upstream completed
*
* '''Backpressures when''' a window has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
* upstream failure `StreamLimitException` downstream.
*
* Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*
* '''Emits when''' the specified number of elements to take has not yet been reached
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the defined number of elements has been taken or upstream completes
*
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limit(n: Int): javadsl.SubSource[Out, Mat] = new SubSource(delegate.limit(n))
/**
* Ensure stream boundedness by evaluating the cost of incoming elements
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
* upstream failure `StreamLimitException` downstream.
*
* Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the specified number of elements to take has not yet been reached
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the defined number of elements has been taken or upstream completes
*
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
new SubSource(delegate.limitWeighted(n)(costFn.apply))
}
def sliding(n: Int, step: Int = 1): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step
/**
* Similar to `fold` but is not a terminal operation,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision#restart]] current value starts at `zero` again
* the stream will continue.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* Note that the `zero` value must be immutable.
*
* '''Emits when''' the function scanning the element returns a new element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.scan(zero)(f.apply))
/**
* Similar to `scan` but with a asynchronous function,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] current value starts at the previous
* current value, or zero when it doesn't have one, and the stream will continue.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* Note that the `zero` value must be immutable.
*
* '''Emits when''' the future returned by f` completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the last future returned by `f` completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.scanAsync(zero) { (out, in) f(out, in).toScala })
/**
* Similar to `scan` but only emits its result when the upstream completes,
* after which it also completes. Applies the given function `f` towards its current and next value,
* yielding the next current value.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision#restart]] current value starts at `zero` again
* the stream will continue.
*
* Note that the `zero` value must be immutable.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.fold(zero)(f.apply))
/**
* Similar to `fold` but with an asynchronous function.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* If the function `f` returns a failure and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* Note that the `zero` value must be immutable.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.foldAsync(zero) { (out, in) f(out, in).toScala })
/**
* Similar to `fold` but uses first element as zero element.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): SubSource[Out, Mat] =
new SubSource(delegate.reduce(f.apply))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
* nums.intersperse(","); // 1 , 2 , 3
* nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
* }}}
*
* In case you want to only prepend or only append an element (yet still use the `intercept` feature
* to inject a separator between elements, you may want to use the following pattern instead of the 3-argument
* version of intersperse (See [[Source.concat]] for semantics details):
*
* {{{
* Source.single(">> ").concat(flow.intersperse(","))
* flow.intersperse(",").concat(Source.single("END"))
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse(start: Out, inject: Out, end: Out): SubSource[Out, Mat] =
new SubSource(delegate.intersperse(start, inject, end))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
* nums.intersperse(","); // 1 , 2 , 3
* nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse(inject: Out): SubSource[Out, Mat] =
new SubSource(delegate.intersperse(inject))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered
*
* '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def groupedWithin(n: Int, d: FiniteDuration): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered
*
* '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
groupedWithin(n, d.asScala)
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala)
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*
* '''Emits when''' the specified number of elements has been dropped already
*
* '''Backpressures when''' the specified number of elements has been dropped and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def drop(n: Long): SubSource[Out, Mat] =
new SubSource(delegate.drop(n))
/**
* Discard the elements received within the given duration at beginning of the stream.
*
* '''Emits when''' the specified time elapsed and a new upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def dropWithin(d: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.dropWithin(d))
/**
* Discard the elements received within the given duration at beginning of the stream.
*
* '''Emits when''' the specified time elapsed and a new upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: java.time.Duration): SubSource[Out, Mat] =
dropWithin(d.asScala)
/**
* Terminate processing (and cancel the upstream publisher) after predicate
* returns false for the first time,
* Due to input buffering some elements may have been requested from upstream publishers
* that will then not be processed downstream of this step.
*
* The stream will be completed without producing any elements if predicate is false for
* the first stream element.
*
* '''Emits when''' the predicate is true
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
*
* '''Cancels when''' predicate returned false or downstream cancels
*/
def takeWhile(p: function.Predicate[Out]): SubSource[Out, Mat] = takeWhile(p, false)
/**
* Terminate processing (and cancel the upstream publisher) after predicate
* returns false for the first time, including the first failed element iff inclusive is true
* Due to input buffering some elements may have been requested from upstream publishers
* that will then not be processed downstream of this step.
*
* The stream will be completed without producing any elements if predicate is false for
* the first stream element.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the predicate is true
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
*
* '''Cancels when''' predicate returned false or downstream cancels
*/
def takeWhile(p: function.Predicate[Out], inclusive: Boolean): SubSource[Out, Mat] =
new SubSource(delegate.takeWhile(p.test, inclusive))
/**
* Discard elements at the beginning of the stream while predicate is true.
* All elements will be taken after predicate returns false first time.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' predicate returned false and for all following stream elements
*
* '''Backpressures when''' predicate returned false and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def dropWhile(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.dropWhile(p.test))
/**
* Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
*
* Delay precision is 10ms to avoid unnecessary timer scheduling cycles
*
* Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
* '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
*
* '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] =
new SubSource(delegate.delay(of, strategy))
/**
* Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
*
* Delay precision is 10ms to avoid unnecessary timer scheduling cycles
*
* Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
* '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
*
* '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] =
delay(of.asScala, strategy)
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] =
new SubSource(delegate.recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
@deprecated("Use recoverWithRetries instead.", "2.4.4")
def recoverWith(pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] =
new SubSource(delegate.recoverWith(pf))
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] =
new SubSource(delegate.recoverWithRetries(attempts, pf))
/**
* While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat] =
new SubSource(delegate.mapError(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*
* '''Emits when''' the specified number of elements to take has not yet been reached
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the defined number of elements has been taken or upstream completes
*
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
*/
def take(n: Long): SubSource[Out, Mat] =
new SubSource(delegate.take(n))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* duration. Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* Note that this can be combined with [[#take]] to limit the number of elements
* within the duration.
*
* '''Emits when''' an upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or timer fires
*
* '''Cancels when''' downstream cancels or timer fires
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def takeWithin(d: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.takeWithin(d))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* duration. Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* Note that this can be combined with [[#take]] to limit the number of elements
* within the duration.
*
* '''Emits when''' an upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or timer fires
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: java.time.Duration): SubSource[Out, Mat] =
takeWithin(d.asScala)
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate allows to derive a seed from the first element and change the aggregated type to be
* different than the input type. See [[Flow.conflate]] for a simpler version that does not change types.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* see also [[SubSource.conflate]] [[SubSource.batch]] [[SubSource.batchWeighted]]
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] =
new SubSource(delegate.conflateWithSeed(seed.apply)(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate does not change the output type of the stream. See [[SubSource.conflateWithSeed]] for a
* more flexible version that can take a seed function and transform elements while rolling up.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* see also [[SubSource.conflateWithSeed]] [[SubSource.batch]] [[SubSource.batchWeighted]]
*
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate(aggregate: function.Function2[Out, Out, Out]): SubSource[Out, Mat] =
new SubSource(delegate.conflate(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
* until the subscriber is ready to accept them. For example a batch step might store received elements in
* an array up to the allowed max limit if the upstream publisher is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' downstream stops backpressuring and there is an aggregated element available
*
* '''Backpressures when''' there are `max` batched elements and 1 pending element and downstream backpressures
*
* '''Completes when''' upstream completes and there is no batched/pending element waiting
*
* '''Cancels when''' downstream cancels
*
* See also [[SubSource.conflate]], [[SubSource.batchWeighted]]
*
* @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero)
* @param seed Provides the first state for a batched value using the first unconsumed element as a start
* @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate
*/
def batch[S](max: Long, seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] =
new SubSource(delegate.batch(max, seed.apply)(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
* until the subscriber is ready to accept them. For example a batch step might concatenate `ByteString`
* elements up to the allowed max limit if the upstream publisher is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* Batching will apply for all elements, even if a single element cost is greater than the total allowed limit.
* In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after
* being applied with the `seed` function) without batching further elements with it, and then the rest of the
* incoming elements are batched.
*
* '''Emits when''' downstream stops backpressuring and there is a batched element available
*
* '''Backpressures when''' there are `max` weighted batched elements + 1 pending element and downstream backpressures
*
* '''Completes when''' upstream completes and there is no batched/pending element waiting
*
* '''Cancels when''' downstream cancels
*
* See also [[SubSource.conflate]], [[SubSource.batch]]
*
* @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero)
* @param costFn a function to compute a single element weight
* @param seed Provides the first state for a batched value using the first unconsumed element as a start
* @param aggregate Takes the currently batched value and the current pending element to produce a new batch
*/
def batchWeighted[S](max: Long, costFn: function.Function[Out, java.lang.Long], seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] =
new SubSource(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply))
/**
* Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older
* element until new element comes from the upstream. For example an expand step might repeat the last element for
* the subscriber until it receives an update from upstream.
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* subscriber.
*
* Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]].
* Exceptions from the `expander` function will complete the stream with failure.
*
* See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element.
*
* '''Emits when''' downstream stops backpressuring
*
* '''Backpressures when''' downstream backpressures or iterator runs empty
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param expander Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
* @see [[#extrapolate]]
*/
def expand[U](expander: function.Function[Out, java.util.Iterator[U]]): SubSource[U, Mat] =
new SubSource(delegate.expand(in expander(in).asScala))
/**
* Allows a faster downstream to progress independent of a slower upstream.
*
* This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream
* signals demand.
*
* Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]].
* Exceptions from the `extrapolate` function will complete the stream with failure.
*
* See also [[#expand]] for a version that can overwrite the original element.
*
* '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR
* `extrapolate` is non-empty and applicable
*
* '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty
*
* '''Completes when''' upstream completes and current `extrapolate` runs empty
*
* '''Cancels when''' downstream cancels
*
* @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based
* on the original, to be emitted in case downstream signals demand.
* @see [[#expand]]
*/
def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]]): SubSource[Out, Mat] =
new SubSource(delegate.extrapolate(in extrapolator(in).asScala))
/**
* Allows a faster downstream to progress independent of a slower upstream.
*
* This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream
* signals demand.
*
* Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]].
* Exceptions from the `extrapolate` function will complete the stream with failure.
*
* See also [[#expand]] for a version that can overwrite the original element.
*
* '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR
* `extrapolate` is non-empty and applicable
*
* '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty
*
* '''Completes when''' upstream completes and current `extrapolate` runs empty
*
* '''Cancels when''' downstream cancels
*
* @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based
* on the original, to be emitted in case downstream signals demand.
* @param initial the initial element to be emitted, in case upstream is able to stall the entire stream.
* @see [[#expand]]
*/
def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]], initial: Out @uncheckedVariance): SubSource[Out, Mat] =
new SubSource(delegate.extrapolate(in extrapolator(in).asScala, Some(initial)))
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available
*
* '''Emits when''' downstream stops backpressuring and there is a pending element in the buffer
*
* '''Backpressures when''' downstream backpressures or depending on OverflowStrategy:
* <ul>
* <li>Backpressure - backpressures when buffer is full</li>
* <li>DropHead, DropTail, DropBuffer - never backpressures</li>
* <li>Fail - fails the stream if buffer gets full</li>
* </ul>
*
* '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): SubSource[Out, Mat] =
new SubSource(delegate.buffer(size, overflowStrategy))
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements)
* and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*
* In case of an upstream error, depending on the current state
* - the master stream signals the error if less than `n` elements has been seen, and therefore the substream
* has not yet been emitted
* - the tail substream signals the error after the prefix and tail has been emitted by the main stream
* (at that point the main stream has already completed)
*
* '''Emits when''' the configured number of prefix elements are available. Emits this prefix, and the rest
* as a substream
*
* '''Backpressures when''' downstream backpressures or substream backpressures
*
* '''Completes when''' prefix elements has been consumed and substream has been consumed
*
* '''Cancels when''' downstream cancels or substream cancels
*/
def prefixAndTail(n: Int): SubSource[akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] =
new SubSource(delegate.prefixAndTail(n).map { case (taken, tail) akka.japi.Pair(taken.asJava, tail.asJava) })
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*
*/
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat] =
new SubSource(delegate.flatMapConcat(x f(x)))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
* substreams are being consumed at any given time.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat] =
new SubSource(delegate.flatMapMerge(breadth, o f(o)))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concat(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.prepend(that))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the operator is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.orElse(secondary))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
*
* '''Backpressures when''' downstream or Sink backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' any of the downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.divertTo(that, when.test))
/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.wireTap(that))
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
def merge(that: Graph[SourceShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.merge(that))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* Example:
* {{{
* Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
* // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After one of sources is complete than all the rest elements will be emitted from the second one
*
* If one of sources gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' this [[Source]] and given one completes
*
* '''Cancels when''' downstream cancels
*/
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] =
new SubSource(delegate.interleave(that, segmentSize))
/**
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
* picking always the smallest of the available elements (waiting for one element from each side
* to be available). This means that possible contiguity of the input streams is not exploited to avoid
* waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete).
*
* '''Emits when''' all of the inputs have an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: Comparator[Out]): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp)))
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zip[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubSource(delegate.zip(source).map { case (o, t) akka.japi.Pair.create(o, t) })
/**
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipWith[Out2, Out3](
that: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): SubSource[Out3, Mat] =
new SubSource(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine)))
/**
* Combine the elements of current [[Source]] into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.
*
* '''Emits when''' upstream emits an element and is paired with their index
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipWithIndex: javadsl.SubSource[akka.japi.Pair[Out @uncheckedVariance, Long], Mat] =
new SubSource(delegate.zipWithIndex.map { case (elem, index) akka.japi.Pair(elem, index) })
/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def initialTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.initialTimeout(timeout))
/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
initialTimeout(timeout.asScala)
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def completionTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.completionTimeout(timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
completionTimeout(timeout.asScala)
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def idleTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.idleTimeout(timeout))
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
idleTimeout(timeout.asScala)
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def backpressureTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.backpressureTimeout(timeout))
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
backpressureTimeout(timeout.asScala)
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* operator attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] =
new SubSource(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* operator attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] =
keepAlive(maxIdle.asScala, injectedElem)
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this operator set the maximum rate
* for emitting messages. This operator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(elements: Int, per: java.time.Duration): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per.asScala))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this operator set the maximum rate
* for emitting messages. This operator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this operator set the maximum rate
* for emitting messages. This operator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int,
mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This operator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per.asScala, costCalculation.apply _))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This operator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This operator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttleEven(elements, per, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
throttleEven(elements, per.asScala, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttleEven(cost, per, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: java.time.Duration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
throttleEven(cost, per.asScala, costCalculation, mode)
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def detach: javadsl.SubSource[Out, Mat] = new SubSource(delegate.detach)
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay is already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.initialDelay(delay))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay is already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] =
initialDelay(delay.asScala)
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing operators).
*/
def withAttributes(attr: Attributes): SubSource[Out, Mat] =
new SubSource(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing operators).
*/
def addAttributes(attr: Attributes): SubSource[Out, Mat] =
new SubSource(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
def named(name: String): SubSource[Out, Mat] =
new SubSource(delegate.named(name))
/**
* Put an asynchronous boundary around this `SubSource`
*/
def async: SubSource[Out, Mat] =
new SubSource(delegate.async)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses the given [[LoggingAdapter]] for logging.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SubSource[Out, Mat] =
new SubSource(delegate.log(name, e extract.apply(e))(log))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any]): SubSource[Out, Mat] =
this.log(name, extract, null)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* Uses the given [[LoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): SubSource[Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow.
*
* Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String): SubSource[Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
}