diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index e2e6c27db9..889a0dfae8 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1318,6 +1318,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recover(pf: PartialFunction[Throwable, Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recover(pf)) @@ -1336,6 +1337,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Flow[In, Out, Mat] = recover { case elem if clazz.isInstance(elem) ⇒ supplier.get() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 65354f419f..59ed8f1baa 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -25,6 +25,7 @@ import scala.concurrent.{ Future, Promise } import scala.compat.java8.OptionConverters._ import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture +import java.util.function.Supplier import scala.compat.java8.FutureConverters._ import scala.reflect.ClassTag @@ -895,6 +896,29 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): javadsl.Source[Out, Mat] = new Source(delegate.interleave(that, segmentSize)) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing + * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the + * other upstream and complete itself. + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * '''Emits when''' element is available from the currently consumed upstream + * + * '''Backpressures when''' downstream backpressures. Signal to current + * upstream, switch to next upstream when received `segmentSize` elements + * + * '''Completes when''' the [[Flow]] and given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int, eagerClose: Boolean): javadsl.Source[Out, Mat] = + new Source(delegate.interleave(that, segmentSize, eagerClose)) + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, @@ -913,6 +937,26 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) + /** + * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. + * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, + * then repeat process. + * + * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing + * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the + * other upstream and complete itself. + * + * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#interleave]] + */ + def interleaveMat[M, M2](that: Graph[SourceShape[Out], M], segmentSize: Int, eagerClose: Boolean, + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = + new Source(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF))) + /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. @@ -928,6 +972,21 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def merge(that: Graph[SourceShape[Out], _]): javadsl.Source[Out, Mat] = new Source(delegate.merge(that)) + /** + * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def merge(that: Graph[SourceShape[Out], _], eagerComplete: Boolean): javadsl.Source[Out, Mat] = + new Source(delegate.merge(that, eagerComplete)) + /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. @@ -942,6 +1001,21 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.mergeMat(that)(combinerToScala(matF))) + /** + * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#merge]] + */ + def mergeMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2], + eagerComplete: Boolean): javadsl.Source[Out, M2] = + new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF))) + /** * Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams, * picking always the smallest of the available elements (waiting for one element from each side @@ -1119,6 +1193,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def recover(pf: PartialFunction[Throwable, Out]): javadsl.Source[Out, Mat] = new Source(delegate.recover(pf)) + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") + def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Source[Out, Mat] = + recover { + case elem if clazz.isInstance(elem) ⇒ supplier.get() + } + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover @@ -1164,6 +1259,31 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = new Source(delegate.recoverWith(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith(clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = + recoverWith { + case elem if clazz.isInstance(elem) ⇒ supplier.get() + } + /** * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure @@ -1190,6 +1310,37 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = new Source(delegate.recoverWithRetries(attempts, pf)) + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @param attempts Maximum number of retries or -1 to retry indefinitely + * @param clazz the class object of the failure cause + * @param supplier supply the new Source to be materialized + */ + def recoverWithRetries(attempts: Int, clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = + recoverWithRetries(attempts, { + case elem if clazz.isInstance(elem) ⇒ supplier.get() + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. @@ -1917,6 +2068,29 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = dropWithin(d.asScala) + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time, including the first failed element if inclusive is true + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + * + * See also [[Source.limit]], [[Source.limitWeighted]] + */ + def takeWhile(p: function.Predicate[Out], inclusive: Boolean): javadsl.Source[Out, Mat] = new Source(delegate.takeWhile(p.test, inclusive)) + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time. Due to input buffering some elements may have been @@ -1933,6 +2107,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Completes when''' predicate returned false or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels + * + * See also [[Source.limit]], [[Source.limitWeighted]] */ def takeWhile(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.takeWhile(p.test)) @@ -2262,6 +2438,62 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def prefixAndTail(n: Int): javadsl.Source[Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] = new Source(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail.asJava) }) + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * a new substream is opened and subsequently fed with all elements belonging to + * that key. + * + * WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the operator + * keeps track of all keys of streams that have already been closed. If you expect an infinite + * number of keys this can cause memory issues. Elements belonging to those keys are drained + * directly and not send to the substream. + * + * Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming + * elements are subject to race-conditions. If elements arrive for a stream that is in the process + * of closing these elements might get lost. + * + * The object returned from this method is not a normal [[Flow]], + * it is a [[SubFlow]]. This means that after this operator all transformations + * are applied to all encountered substreams in the same fashion. Substream mode + * is exited either by closing the substream (i.e. connecting it to a [[Sink]]) + * or by merging the substreams back together; see the `to` and `mergeBack` methods + * on [[SubFlow]] for more information. + * + * It is important to note that the substreams also propagate back-pressure as + * any other stream, which means that blocking one substream will block the `groupBy` + * operator itself—and thereby all substreams—once all internal or + * explicit buffers are filled. + * + * If the group by function `f` throws an exception and the supervision decision + * is [[akka.stream.Supervision#stop]] the stream and substreams will be completed + * with failure. + * + * If the group by function `f` throws an exception and the supervision decision + * is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]] + * the element is dropped and the stream and substreams continue. + * + * Function `f` MUST NOT return `null`. This will throw exception and trigger supervision decision mechanism. + * + * '''Emits when''' an element for which the grouping function returns a group that has not yet been created. + * Emits the new group + * + * '''Backpressures when''' there is an element pending for a group whose substream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels and all substreams cancel + * + * @param maxSubstreams configures the maximum number of substreams (keys) + * that are supported; if more distinct keys are encountered then the stream fails + * @param f computes the key for each element + * @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their + * corresponding keys arrive after completion + */ + def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K], allowClosedSubstreamRecreation: Boolean): SubSource[Out, Mat] = + new SubSource(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation)) + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -2360,6 +2592,16 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def splitWhen(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.splitWhen(p.test)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. + * + * @see [[#splitWhen]] + */ + def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] = + new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test)) + /** * This operation applies the given predicate to all incoming elements and * emits them to a stream of output streams. It *ends* the current substream when the @@ -2407,6 +2649,16 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def splitAfter(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.splitAfter(p.test)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams. It *ends* the current substream when the + * predicate is true. + * + * @see [[#splitAfter]] + */ + def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] = + new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test)) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by concatenation,