Add some javadsl Flow methods to Source as well (#25476)

This commit is contained in:
Christopher Hunt 2018-08-20 19:24:14 +10:00 committed by Arnout Engelen
parent 9e66f7121f
commit 5be8d975c4
2 changed files with 254 additions and 0 deletions

View file

@ -25,6 +25,7 @@ import scala.concurrent.{ Future, Promise }
import scala.compat.java8.OptionConverters._
import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
@ -895,6 +896,29 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): javadsl.Source[Out, Mat] =
new Source(delegate.interleave(that, segmentSize))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int, eagerClose: Boolean): javadsl.Source[Out, Mat] =
new Source(delegate.interleave(that, segmentSize, eagerClose))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
@ -913,6 +937,26 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF)))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]
*/
def interleaveMat[M, M2](that: Graph[SourceShape[Out], M], segmentSize: Int, eagerClose: Boolean,
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF)))
/**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
@ -928,6 +972,21 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def merge(that: Graph[SourceShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.merge(that))
/**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def merge(that: Graph[SourceShape[Out], _], eagerComplete: Boolean): javadsl.Source[Out, Mat] =
new Source(delegate.merge(that, eagerComplete))
/**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
@ -942,6 +1001,21 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.mergeMat(that)(combinerToScala(matF)))
/**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]]
*/
def mergeMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2],
eagerComplete: Boolean): javadsl.Source[Out, M2] =
new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
/**
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
* picking always the smallest of the available elements (waiting for one element from each side
@ -1119,6 +1193,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def recover(pf: PartialFunction[Throwable, Out]): javadsl.Source[Out, Mat] =
new Source(delegate.recover(pf))
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use recoverWithRetries instead.", "2.4.4")
def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Source[Out, Mat] =
recover {
case elem if clazz.isInstance(elem) supplier.get()
}
/**
* While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
@ -1164,6 +1259,31 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
new Source(delegate.recoverWith(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith(clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
recoverWith {
case elem if clazz.isInstance(elem) supplier.get()
}
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
@ -1190,6 +1310,37 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
new Source(delegate.recoverWithRetries(attempts, pf))
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param clazz the class object of the failure cause
* @param supplier supply the new Source to be materialized
*/
def recoverWithRetries(attempts: Int, clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
recoverWithRetries(attempts, {
case elem if clazz.isInstance(elem) supplier.get()
})
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream.
@ -1917,6 +2068,29 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] =
dropWithin(d.asScala)
/**
* Terminate processing (and cancel the upstream publisher) after predicate
* returns false for the first time, including the first failed element if inclusive is true
* Due to input buffering some elements may have been requested from upstream publishers
* that will then not be processed downstream of this step.
*
* The stream will be completed without producing any elements if predicate is false for
* the first stream element.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the predicate is true
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
*
* '''Cancels when''' predicate returned false or downstream cancels
*
* See also [[Source.limit]], [[Source.limitWeighted]]
*/
def takeWhile(p: function.Predicate[Out], inclusive: Boolean): javadsl.Source[Out, Mat] = new Source(delegate.takeWhile(p.test, inclusive))
/**
* Terminate processing (and cancel the upstream publisher) after predicate
* returns false for the first time. Due to input buffering some elements may have been
@ -1933,6 +2107,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* '''Completes when''' predicate returned false or upstream completes
*
* '''Cancels when''' predicate returned false or downstream cancels
*
* See also [[Source.limit]], [[Source.limitWeighted]]
*/
def takeWhile(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.takeWhile(p.test))
@ -2262,6 +2438,62 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def prefixAndTail(n: Int): javadsl.Source[Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] =
new Source(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail.asJava) })
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
* using the given function. When a new key is encountered for the first time
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the operator
* keeps track of all keys of streams that have already been closed. If you expect an infinite
* number of keys this can cause memory issues. Elements belonging to those keys are drained
* directly and not send to the substream.
*
* Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
* elements are subject to race-conditions. If elements arrive for a stream that is in the process
* of closing these elements might get lost.
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubFlow]]. This means that after this operator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
* is exited either by closing the substream (i.e. connecting it to a [[Sink]])
* or by merging the substreams back together; see the `to` and `mergeBack` methods
* on [[SubFlow]] for more information.
*
* It is important to note that the substreams also propagate back-pressure as
* any other stream, which means that blocking one substream will block the `groupBy`
* operator itselfand thereby all substreamsonce all internal or
* explicit buffers are filled.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision#stop]] the stream and substreams will be completed
* with failure.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*
* Function `f` MUST NOT return `null`. This will throw exception and trigger supervision decision mechanism.
*
* '''Emits when''' an element for which the grouping function returns a group that has not yet been created.
* Emits the new group
*
* '''Backpressures when''' there is an element pending for a group whose substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and all substreams cancel
*
* @param maxSubstreams configures the maximum number of substreams (keys)
* that are supported; if more distinct keys are encountered then the stream fails
* @param f computes the key for each element
* @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their
* corresponding keys arrive after completion
*/
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K], allowClosedSubstreamRecreation: Boolean): SubSource[Out, Mat] =
new SubSource(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation))
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
@ -2360,6 +2592,16 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def splitWhen(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it.
*
* @see [[#splitWhen]]
*/
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
@ -2407,6 +2649,16 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def splitAfter(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true.
*
* @see [[#splitAfter]]
*/
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,